diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto index 52e21d7..142dc45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto @@ -72,4 +72,5 @@ message ApplicationStateDataProto { message ApplicationAttemptStateDataProto { optional ApplicationAttemptIdProto attemptId = 1; optional ContainerProto master_container = 2; + optional bytes app_attempt_tokens = 3; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4dcb6f2..14970f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -315,10 +315,6 @@ public void recover(RMState state) throws Exception { LOG.info("Recovering " + appStates.size() + " applications"); for(ApplicationState appState : appStates.values()) { boolean shouldRecover = true; - // re-submit the application - // this is going to send an app start event but since the async dispatcher - // has not started that event will be queued until we have completed re - // populating the state if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { // do not recover unmanaged applications since current recovery // mechanism of restarting attempts does not work for them. @@ -348,6 +344,10 @@ public void recover(RMState state) throws Exception { shouldRecover = false; } + // re-submit the application + // this is going to send an app start event but since the async dispatcher + // has not started that event will be queued until we have completed re + // populating the state if(shouldRecover) { LOG.info("Recovering application " + appState.getAppId()); submitApplication(appState.getApplicationSubmissionContext(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index a9d40eb..0ad403b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; @@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -202,25 +200,16 @@ private void setupTokensAndEnv( credentials.readTokenStorageStream(dibb); } - ApplicationTokenIdentifier id = new ApplicationTokenIdentifier( - application.getAppAttemptId()); - Token appMasterToken = - new Token(id, - this.rmContext.getApplicationTokenSecretManager()); - InetSocketAddress serviceAddr = conf.getSocketAddr( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - // normally the client should set the service after acquiring the token, - // but this token is directly provided to the AMs - SecurityUtil.setTokenService(appMasterToken, serviceAddr); - - // Add the ApplicationMaster token - credentials.addToken(appMasterToken.getService(), appMasterToken); + // Add application token + Token applicationToken = + application.getApplicationToken(); + if(applicationToken != null) { + credentials.addToken(applicationToken.getService(), applicationToken); + } DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); - container.setContainerTokens( - ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + container.setContainerTokens(ByteBuffer.wrap(dob.getData(), 0, + dob.getLength())); SecretKey clientSecretKey = this.rmContext.getClientToAMTokenSecretManager().getMasterKey( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index c4990da..03154b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -31,13 +31,15 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -114,8 +116,17 @@ public synchronized RMState loadState() throws Exception { ApplicationAttemptStateDataPBImpl attemptStateData = new ApplicationAttemptStateDataPBImpl( ApplicationAttemptStateDataProto.parseFrom(childData)); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + Credentials credentials = null; + if(attemptStateData.getAppAttemptTokens() != null){ + credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), credentials); + // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); attempts.add(attemptState); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5fb1167..dd6fab5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -23,10 +23,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -79,8 +81,16 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr, throws Exception { ApplicationAttemptId attemptId = ConverterUtils .toApplicationAttemptId(attemptIdStr); - ApplicationAttemptState attemptState = new ApplicationAttemptState( - attemptId, attemptStateData.getMasterContainer()); + Credentials credentials = null; + if(attemptStateData.getAppAttemptTokens() != null){ + DataInputByteBuffer dibb = new DataInputByteBuffer(); + credentials = new Credentials(); + dibb.reset(attemptStateData.getAppAttemptTokens()); + credentials.readTokenStorageStream(dibb); + } + ApplicationAttemptState attemptState = + new ApplicationAttemptState(attemptId, + attemptStateData.getMasterContainer(), credentials); ApplicationState appState = state.getApplicationState().get( attemptState.getAttemptId().getApplicationId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index db04495..29bdbb0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -20,8 +20,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @Unstable public class NullRMStateStore extends RMStateStore { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index d5c5015..c8e0d9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; @@ -26,6 +28,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -34,8 +39,10 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -58,19 +65,25 @@ public static class ApplicationAttemptState { final ApplicationAttemptId attemptId; final Container masterContainer; - + final Credentials appAttemptTokens; + public ApplicationAttemptState(ApplicationAttemptId attemptId, - Container masterContainer) { + Container masterContainer, + Credentials appAttemptTokens) { this.attemptId = attemptId; this.masterContainer = masterContainer; + this.appAttemptTokens = appAttemptTokens; } - + public Container getMasterContainer() { return masterContainer; } public ApplicationAttemptId getAttemptId() { return attemptId; } + public Credentials getAppAttemptTokens() { + return appAttemptTokens; + } } /** @@ -199,10 +212,23 @@ protected abstract void storeApplicationState(String appId, * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt */ public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + Credentials credentials = new Credentials(); + Token appToken = appAttempt.getApplicationToken(); + if(appToken != null){ + credentials.addToken(appToken.getService(), appToken); + } + Token clientToken = appAttempt.getClientToken(); + if(clientToken != null){ + credentials.addToken(clientToken.getService(), clientToken); + LOG.info(clientToken); + } + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), credentials); + dispatcher.getEventHandler().handle( - new RMStateStoreAppAttemptEvent(attemptState)); + new RMStateStoreAppAttemptEvent(attemptState)); } /** @@ -226,8 +252,19 @@ public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getApplicationSubmissionContext()); for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { - ApplicationAttemptState attemptState = new ApplicationAttemptState( - appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + Credentials credentials = new Credentials(); + Token appToken = appAttempt.getApplicationToken(); + if(appToken != null){ + credentials.addToken(appToken.getService(), appToken); + } + Token clientToken = appAttempt.getClientToken(); + if(clientToken != null){ + credentials.addToken(clientToken.getService(), clientToken); + } + + ApplicationAttemptState attemptState = + new ApplicationAttemptState(appAttempt.getAppAttemptId(), + appAttempt.getMasterContainer(), credentials); appState.attempts.put(attemptState.getAttemptId(), attemptState); } @@ -283,13 +320,27 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) { ApplicationAttemptState attemptState = ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); Exception storedException = null; - ApplicationAttemptStateDataPBImpl attemptStateData = - new ApplicationAttemptStateDataPBImpl(); - attemptStateData.setAttemptId(attemptState.getAttemptId()); - attemptStateData.setMasterContainer(attemptState.getMasterContainer()); - LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + Credentials credentials = attemptState.getAppAttemptTokens(); + ByteBuffer appAttemptTokens = null; + if(credentials != null){ + DataOutputBuffer dob = new DataOutputBuffer(); + try { + credentials.writeTokenStorageToStream(dob); + } catch (IOException e) { + LOG.warn("Error converting credentials to bytebuffer" + + " while storing attempt: " + attemptState.getAttemptId()); + } + appAttemptTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + ApplicationAttemptStateDataPBImpl attemptStateData = + (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl + .newApplicationAttemptStateData(attemptState.getAttemptId(), + attemptState.getMasterContainer(), appAttemptTokens); + try { + LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); storeApplicationAttemptState(attemptState.getAttemptId().toString(), attemptStateData); } catch (Exception e) { @@ -358,7 +409,5 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, public void handle(RMStateStoreEvent event) { handleStoreEvent(event); } - } - -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java index 64e3ccb..2622b0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -49,4 +51,14 @@ public Container getMasterContainer(); public void setMasterContainer(Container container); + + /** + * The application attempt tokens that belong to this attempt + * @return The application attempt tokens that belong to this attempt + */ + @Public + @Unstable + public ByteBuffer getAppAttemptTokens(); + + public void setAppAttemptTokens(ByteBuffer attemptTokens); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java deleted file mode 100644 index d033f5c..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; - -public class ApplicationAttemptStateDataPBImpl -extends ProtoBase -implements ApplicationAttemptStateData { - - ApplicationAttemptStateDataProto proto = - ApplicationAttemptStateDataProto.getDefaultInstance(); - ApplicationAttemptStateDataProto.Builder builder = null; - boolean viaProto = false; - - private ApplicationAttemptId attemptId = null; - private Container masterContainer = null; - - public ApplicationAttemptStateDataPBImpl() { - builder = ApplicationAttemptStateDataProto.newBuilder(); - } - - public ApplicationAttemptStateDataPBImpl( - ApplicationAttemptStateDataProto proto) { - this.proto = proto; - viaProto = true; - } - - public ApplicationAttemptStateDataProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - private void mergeLocalToBuilder() { - if (this.attemptId != null) { - builder.setAttemptId(((ApplicationAttemptIdPBImpl)attemptId).getProto()); - } - if(this.masterContainer != null) { - builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); - } - } - - private void mergeLocalToProto() { - if (viaProto) - maybeInitBuilder(); - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = ApplicationAttemptStateDataProto.newBuilder(proto); - } - viaProto = false; - } - - @Override - public ApplicationAttemptId getAttemptId() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if(attemptId != null) { - return attemptId; - } - if (!p.hasAttemptId()) { - return null; - } - attemptId = new ApplicationAttemptIdPBImpl(p.getAttemptId()); - return attemptId; - } - - @Override - public void setAttemptId(ApplicationAttemptId attemptId) { - maybeInitBuilder(); - if (attemptId == null) { - builder.clearAttemptId(); - } - this.attemptId = attemptId; - } - - @Override - public Container getMasterContainer() { - ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; - if(masterContainer != null) { - return masterContainer; - } - if (!p.hasMasterContainer()) { - return null; - } - masterContainer = new ContainerPBImpl(p.getMasterContainer()); - return masterContainer; - } - - @Override - public void setMasterContainer(Container container) { - maybeInitBuilder(); - if (container == null) { - builder.clearMasterContainer(); - } - this.masterContainer = container; - } - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java deleted file mode 100644 index 0aa64b7..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; - -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; - -public class ApplicationStateDataPBImpl -extends ProtoBase -implements ApplicationStateData { - - ApplicationStateDataProto proto = - ApplicationStateDataProto.getDefaultInstance(); - ApplicationStateDataProto.Builder builder = null; - boolean viaProto = false; - - private ApplicationSubmissionContext applicationSubmissionContext = null; - - public ApplicationStateDataPBImpl() { - builder = ApplicationStateDataProto.newBuilder(); - } - - public ApplicationStateDataPBImpl( - ApplicationStateDataProto proto) { - this.proto = proto; - viaProto = true; - } - - public ApplicationStateDataProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - private void mergeLocalToBuilder() { - if (this.applicationSubmissionContext != null) { - builder.setApplicationSubmissionContext( - ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext) - .getProto()); - } - } - - private void mergeLocalToProto() { - if (viaProto) - maybeInitBuilder(); - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = ApplicationStateDataProto.newBuilder(proto); - } - viaProto = false; - } - - @Override - public long getSubmitTime() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasSubmitTime()) { - return -1; - } - return (p.getSubmitTime()); - } - - @Override - public void setSubmitTime(long submitTime) { - maybeInitBuilder(); - builder.setSubmitTime(submitTime); - } - - @Override - public ApplicationSubmissionContext getApplicationSubmissionContext() { - ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; - if(applicationSubmissionContext != null) { - return applicationSubmissionContext; - } - if (!p.hasApplicationSubmissionContext()) { - return null; - } - applicationSubmissionContext = - new ApplicationSubmissionContextPBImpl( - p.getApplicationSubmissionContext()); - return applicationSubmissionContext; - } - - @Override - public void setApplicationSubmissionContext( - ApplicationSubmissionContext context) { - maybeInitBuilder(); - if (context == null) { - builder.clearApplicationSubmissionContext(); - } - this.applicationSubmissionContext = context; - } - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java new file mode 100644 index 0000000..67e434e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; + +public class ApplicationAttemptStateDataPBImpl +extends ProtoBase +implements ApplicationAttemptStateData { + private static final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + ApplicationAttemptStateDataProto proto = + ApplicationAttemptStateDataProto.getDefaultInstance(); + ApplicationAttemptStateDataProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationAttemptId attemptId = null; + private Container masterContainer = null; + private ByteBuffer appAttemptTokens = null; + + public ApplicationAttemptStateDataPBImpl() { + builder = ApplicationAttemptStateDataProto.newBuilder(); + } + + public ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto proto) { + this.proto = proto; + viaProto = true; + } + + public ApplicationAttemptStateDataProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.attemptId != null) { + builder.setAttemptId(((ApplicationAttemptIdPBImpl)attemptId).getProto()); + } + if(this.masterContainer != null) { + builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); + } + if(this.appAttemptTokens != null) { + builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ApplicationAttemptStateDataProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationAttemptId getAttemptId() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(attemptId != null) { + return attemptId; + } + if (!p.hasAttemptId()) { + return null; + } + attemptId = new ApplicationAttemptIdPBImpl(p.getAttemptId()); + return attemptId; + } + + @Override + public void setAttemptId(ApplicationAttemptId attemptId) { + maybeInitBuilder(); + if (attemptId == null) { + builder.clearAttemptId(); + } + this.attemptId = attemptId; + } + + @Override + public Container getMasterContainer() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(masterContainer != null) { + return masterContainer; + } + if (!p.hasMasterContainer()) { + return null; + } + masterContainer = new ContainerPBImpl(p.getMasterContainer()); + return masterContainer; + } + + @Override + public void setMasterContainer(Container container) { + maybeInitBuilder(); + if (container == null) { + builder.clearMasterContainer(); + } + this.masterContainer = container; + } + + @Override + public ByteBuffer getAppAttemptTokens() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(appAttemptTokens != null) { + return appAttemptTokens; + } + if(!p.hasAppAttemptTokens()) { + return null; + } + this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens()); + return appAttemptTokens; + } + + @Override + public void setAppAttemptTokens(ByteBuffer attemptTokens) { + maybeInitBuilder(); + if(attemptTokens == null) { + builder.clearAppAttemptTokens(); + } + this.appAttemptTokens = attemptTokens; + } + + public static ApplicationAttemptStateData newApplicationAttemptStateData( + ApplicationAttemptId attemptId, Container container, + ByteBuffer attemptTokens) { + ApplicationAttemptStateData attemptStateData = + recordFactory.newRecordInstance(ApplicationAttemptStateData.class); + attemptStateData.setAttemptId(attemptId); + attemptStateData.setMasterContainer(container); + attemptStateData.setAppAttemptTokens(attemptTokens); + return attemptStateData; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java new file mode 100644 index 0000000..c95d6e1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; + +public class ApplicationStateDataPBImpl +extends ProtoBase +implements ApplicationStateData { + + ApplicationStateDataProto proto = + ApplicationStateDataProto.getDefaultInstance(); + ApplicationStateDataProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationSubmissionContext applicationSubmissionContext = null; + + public ApplicationStateDataPBImpl() { + builder = ApplicationStateDataProto.newBuilder(); + } + + public ApplicationStateDataPBImpl( + ApplicationStateDataProto proto) { + this.proto = proto; + viaProto = true; + } + + public ApplicationStateDataProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.applicationSubmissionContext != null) { + builder.setApplicationSubmissionContext( + ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext) + .getProto()); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ApplicationStateDataProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public long getSubmitTime() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasSubmitTime()) { + return -1; + } + return (p.getSubmitTime()); + } + + @Override + public void setSubmitTime(long submitTime) { + maybeInitBuilder(); + builder.setSubmitTime(submitTime); + } + + @Override + public ApplicationSubmissionContext getApplicationSubmissionContext() { + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + if(applicationSubmissionContext != null) { + return applicationSubmissionContext; + } + if (!p.hasApplicationSubmissionContext()) { + return null; + } + applicationSubmissionContext = + new ApplicationSubmissionContextPBImpl( + p.getApplicationSubmissionContext()); + return applicationSubmissionContext; + } + + @Override + public void setApplicationSubmissionContext( + ApplicationSubmissionContext context) { + maybeInitBuilder(); + if (context == null) { + builder.clearApplicationSubmissionContext(); + } + this.applicationSubmissionContext = context; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 874232b..7965fe2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -443,7 +445,14 @@ public ApplicationReport createAndGetApplicationReport(boolean allowAccess) { currentApplicationAttemptId = this.currentAttempt.getAppAttemptId(); trackingUrl = this.currentAttempt.getTrackingUrl(); origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl(); - clientToken = this.currentAttempt.getClientToken(); + Token attemptClientToken = + this.currentAttempt.getClientToken(); + if (attemptClientToken != null) { + clientToken = + BuilderUtils.newClientToken(attemptClientToken.getIdentifier(), + attemptClientToken.getKind().toString(), attemptClientToken + .getPassword(), attemptClientToken.getService().toString()); + } host = this.currentAttempt.getHost(); rpcPort = this.currentAttempt.getRpcPort(); appUsageReport = currentAttempt.getApplicationResourceUsageReport(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 510c4fe..35071a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -21,16 +21,18 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ClientToken; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; /** @@ -92,7 +94,7 @@ * The token required by the clients to talk to the application attempt * @return the token required by the clients to talk to the application attempt */ - ClientToken getClientToken(); + Token getClientToken(); /** * Diagnostics information for the application attempt. @@ -147,6 +149,12 @@ ApplicationSubmissionContext getSubmissionContext(); /** + * The tokens belonging to this app attempt + * @return The tokens belonging to this app attempt + */ + Token getApplicationToken(); + + /** * Get application container and resource usage information. * @return an ApplicationResourceUsageReport object. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index eaa15f5..2ea2445 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.yarn.util.StringHelper.pjoin; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -38,6 +39,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; @@ -58,7 +62,10 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.ApplicationTokenSelector; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientTokenSelector; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; @@ -123,8 +130,9 @@ private final WriteLock writeLock; private final ApplicationAttemptId applicationAttemptId; - private ClientToken clientToken; + private Token clientToken; private final ApplicationSubmissionContext submissionContext; + private Token applicationToken = null; //nodes on while this attempt's containers ran private final Set ranNodes = @@ -366,19 +374,6 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, this.scheduler = scheduler; this.masterService = masterService; - if (UserGroupInformation.isSecurityEnabled()) { - - this.rmContext.getClientToAMTokenSecretManager().registerApplication( - appAttemptId); - - Token token = - new Token(new ClientTokenIdentifier( - appAttemptId), this.rmContext.getClientToAMTokenSecretManager()); - this.clientToken = - BuilderUtils.newClientToken(token.getIdentifier(), token.getKind() - .toString(), token.getPassword(), token.getService().toString()); - } - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -502,11 +497,16 @@ private void setTrackingUrlToRMAppPage() { } @Override - public ClientToken getClientToken() { + public Token getClientToken() { return this.clientToken; } @Override + public Token getApplicationToken() { + return this.applicationToken; + } + + @Override public String getDiagnostics() { this.readLock.lock(); @@ -657,14 +657,42 @@ public void recover(RMState state) { ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); assert attemptState != null; setMasterContainer(attemptState.getMasterContainer()); - LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + recoverAppAttemptTokens(attemptState.getAppAttemptTokens()); + LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + " AttemptId: " + getAppAttemptId() + " MasterContainer: " + masterContainer); setDiagnostics("Attempt recovered after RM restart"); handle(new RMAppAttemptEvent(getAppAttemptId(), RMAppAttemptEventType.RECOVER)); } - + + private void recoverAppAttemptTokens(Credentials appAttemptTokens) { + if (appAttemptTokens == null) { + return; + } + if (UserGroupInformation.isSecurityEnabled()) { + + ClientTokenSelector clientTokenSelector = new ClientTokenSelector(); + this.clientToken = + clientTokenSelector.selectToken(new Text(), + appAttemptTokens.getAllTokens()); + + InetSocketAddress serviceAddr = conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + ApplicationTokenSelector appTokenSelector = new ApplicationTokenSelector(); + this.applicationToken = + appTokenSelector.selectToken( + SecurityUtil.buildTokenService(serviceAddr), + appAttemptTokens.getAllTokens()); + + // For now, no need to populate tokens back to + // ApplicationTokenSecretManager. + // Later in work-preserve restart, we'll create NEW->RUNNING transition + // in which the restored tokens will be added to the secret manager + } + } private static class BaseTransition implements SingleArcTransition { @@ -686,6 +714,36 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); + if (UserGroupInformation.isSecurityEnabled()) { + + appAttempt.rmContext.getClientToAMTokenSecretManager() + .registerApplication(appAttempt.applicationAttemptId); + + // create clientToken + appAttempt.clientToken = + new Token(new ClientTokenIdentifier( + appAttempt.applicationAttemptId), + appAttempt.rmContext.getClientToAMTokenSecretManager()); + + // create application token + ApplicationTokenIdentifier id = + new ApplicationTokenIdentifier(appAttempt.applicationAttemptId); + Token applicationToken = + new Token(id, + appAttempt.rmContext.getApplicationTokenSecretManager()); + InetSocketAddress serviceAddr = + appAttempt.conf.getSocketAddr( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + // normally the client should set the service after acquiring the + // token, but this token is directly provided to the AMs + SecurityUtil.setTokenService(applicationToken, serviceAddr); + + appAttempt.applicationToken = applicationToken; + + } + // Add the application to the scheduler appAttempt.eventHandler.handle( new AppAddedSchedulerEvent(appAttempt.applicationAttemptId, @@ -992,7 +1050,6 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.rmContext.getAMFinishingMonitor().unregister( appAttempt.getAppAttemptId()); - // Unregister from the ClientTokenSecretManager if (UserGroupInformation.isSecurityEnabled()) { appAttempt.rmContext.getClientToAMTokenSecretManager() @@ -1191,7 +1248,7 @@ public long getStartTime() { this.readLock.unlock(); } } - + private void launchAttempt(){ // Send event to launch the AM Container eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index fded9fb..30ae6ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -404,7 +404,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { } @Test - public void testTokenRestoredOnRMrestart() throws Exception { + public void testDelegationTokenRestoredInDelegationTokenRenewer() + throws Exception { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); ExitUtil.disableSystemExit(); @@ -423,7 +424,7 @@ public void testTokenRestoredOnRMrestart() throws Exception { Map rmAppState = rmState.getApplicationState(); - MockRM rm1 = new MyMockRM(conf, memStore); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); rm1.start(); HashSet> tokenSet = @@ -461,21 +462,26 @@ public void testTokenRestoredOnRMrestart() throws Exception { ApplicationState appState = rmAppState.get(app.getApplicationId()); Assert.assertNotNull(appState); + // assert delegation tokens exist in rm1 DelegationTokenRenewr + Assert.assertEquals(tokenSet, rm1.getRMContext() + .getDelegationTokenRenewer().getDelegationTokens()); + // assert delegation tokens are saved DataOutputBuffer dob = new DataOutputBuffer(); ts.writeTokenStorageToStream(dob); ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + securityTokens.rewind(); Assert.assertEquals(securityTokens, appState .getApplicationSubmissionContext().getAMContainerSpec() .getContainerTokens()); // start new RM - MockRM rm2 = new MyMockRM(conf, memStore); + MockRM rm2 = new TestSecurityMockRM(conf, memStore); rm2.start(); - // verify tokens are properly populated back to DelegationTokenRenewer - Assert.assertEquals(tokenSet, rm1.getRMContext() + // verify tokens are properly populated back to rm2 DelegationTokenRenewer + Assert.assertEquals(tokenSet, rm2.getRMContext() .getDelegationTokenRenewer().getDelegationTokens()); // stop the RM @@ -483,9 +489,92 @@ public void testTokenRestoredOnRMrestart() throws Exception { rm2.stop(); } - class MyMockRM extends MockRM { + @Test + public void testAppAttemptTokensRestoredOnRMRestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new TestSecurityMockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submit an app + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), "default"); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app1.getApplicationId()); + Assert.assertNotNull(appState); + + // Allocate the AM + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + + // assert attempt info is saved + ApplicationAttemptState attemptState = appState.getAttempt(attemptId1); + Assert.assertNotNull(attemptState); + Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), + attemptState.getMasterContainer().getId()); + + // the appToken and clientToken that are generated when RMAppAttempt is created, + HashSet> tokenSet = new HashSet>(); + tokenSet.add(attempt1.getApplicationToken()); + tokenSet.add(attempt1.getClientToken()); + + // assert application Token is saved + HashSet> savedTokens = new HashSet>(); + savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens()); + Assert.assertEquals(tokenSet, savedTokens); + + // start new RM + MockRM rm2 = new TestSecurityMockRM(conf, memStore); + rm2.start(); + + RMApp loadedApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = loadedApp1.getRMAppAttempt(attemptId1); + + // assert loaded attempt recovered attempt tokens + Assert.assertNotNull(loadedAttempt1); + savedTokens.clear(); + savedTokens.add(loadedAttempt1.getApplicationToken()); + savedTokens.add(loadedAttempt1.getClientToken()); + Assert.assertEquals(tokenSet, savedTokens); + + // assert clientToken is recovered back to api-versioned clientToken + Assert.assertEquals(attempt1.getClientToken(), + loadedAttempt1.getClientToken()); + + // Not testing ApplicationTokenSecretManager has the password populated back, + // that is needed in work-preserving restart + + rm1.stop(); + rm2.stop(); + } + + class TestSecurityMockRM extends MockRM { - public MyMockRM(Configuration conf, RMStateStore store) { + public TestSecurityMockRM(Configuration conf, RMStateStore store) { super(conf, store); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index 440908f..a245e54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -18,14 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; -import org.junit.Test; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,6 +39,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -44,13 +51,18 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Test; public class TestRMStateStore { @@ -141,7 +153,7 @@ public void addOrphanAttemptIfNeeded(RMStateStore testStore, ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId( "appattempt_1352994193343_0003_000001"); storeAttempt(testStore, attemptId, - "container_1352994193343_0003_01_000001", dispatcher); + "container_1352994193343_0003_01_000001", null, null, dispatcher); } @Override @@ -186,14 +198,17 @@ void storeApp(RMStateStore store, ApplicationId appId, long time) } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, - String containerIdStr, TestDispatcher dispatcher) - throws Exception { + String containerIdStr, Token appToken, + Token clientToken, TestDispatcher dispatcher) + throws Exception { Container container = new ContainerPBImpl(); container.setId(ConverterUtils.toContainerId(containerIdStr)); RMAppAttempt mockAttempt = mock(RMAppAttempt.class); when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); when(mockAttempt.getMasterContainer()).thenReturn(container); + when(mockAttempt.getApplicationToken()).thenReturn(appToken); + when(mockAttempt.getClientToken()).thenReturn(clientToken); dispatcher.attemptId = attemptId; dispatcher.storedException = null; store.storeApplicationAttempt(mockAttempt); @@ -201,30 +216,58 @@ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, return container.getId(); } + @SuppressWarnings("unchecked") void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { long submitTime = System.currentTimeMillis(); + Configuration conf = new YarnConfiguration(); RMStateStore store = stateStoreHelper.getRMStateStore(); TestDispatcher dispatcher = new TestDispatcher(); store.setDispatcher(dispatcher); + ApplicationTokenSecretManager appTokenMgr = + new ApplicationTokenSecretManager(conf); + ClientToAMTokenSecretManagerInRM clientTokenMgr = + new ClientToAMTokenSecretManagerInRM(); + ApplicationAttemptId attemptId1 = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0001_000001"); ApplicationId appId1 = attemptId1.getApplicationId(); storeApp(store, appId1, submitTime); + + // create application token1 for attempt1 + List> appAttemptToken1 = + generateTokens(attemptId1, appTokenMgr, clientTokenMgr, conf); + HashSet> attemptTokenSet1 = new HashSet>(); + attemptTokenSet1.addAll(appAttemptToken1); + ContainerId containerId1 = storeAttempt(store, attemptId1, - "container_1352994193343_0001_01_000001", dispatcher); + "container_1352994193343_0001_01_000001", + (Token) (appAttemptToken1.get(0)), + (Token)(appAttemptToken1.get(1)), + dispatcher); + String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002"; ApplicationAttemptId attemptId2 = - ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + ConverterUtils.toApplicationAttemptId(appAttemptIdStr2); + + // create application token2 for attempt2 + List> appAttemptToken2 = + generateTokens(attemptId2, appTokenMgr, clientTokenMgr, conf); + HashSet> attemptTokenSet2 = new HashSet>(); + attemptTokenSet2.addAll(appAttemptToken2); + ContainerId containerId2 = storeAttempt(store, attemptId2, - "container_1352994193343_0001_02_000001", dispatcher); + "container_1352994193343_0001_02_000001", + (Token) (appAttemptToken2.get(0)), + (Token)(appAttemptToken2.get(1)), + dispatcher); ApplicationAttemptId attemptIdRemoved = ConverterUtils .toApplicationAttemptId("appattempt_1352994193343_0002_000001"); ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId(); storeApp(store, appIdRemoved, submitTime); storeAttempt(store, attemptIdRemoved, - "container_1352994193343_0002_01_000001", dispatcher); + "container_1352994193343_0002_01_000001", null, null, dispatcher); RMApp mockRemovedApp = mock(RMApp.class); HashMap attempts = @@ -268,12 +311,21 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { assertEquals(attemptId1, attemptState.getAttemptId()); // attempt1 container is loaded correctly assertEquals(containerId1, attemptState.getMasterContainer().getId()); + // attempt1 applicationToken is loaded correctly + HashSet> savedTokens = new HashSet>(); + savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens()); + assertEquals(attemptTokenSet1, savedTokens); + attemptState = appState.getAttempt(attemptId2); // attempt2 is loaded correctly assertNotNull(attemptState); assertEquals(attemptId2, attemptState.getAttemptId()); // attempt2 container is loaded correctly assertEquals(containerId2, attemptState.getMasterContainer().getId()); + // attempt2 applicationToken is loaded correctly + savedTokens.clear(); + savedTokens.addAll(attemptState.getAppAttemptTokens().getAllTokens()); + assertEquals(attemptTokenSet2, savedTokens); // assert store is in expected state after everything is cleaned assertTrue(stateStoreHelper.isFinalStateValid()); @@ -281,4 +333,23 @@ void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception { store.close(); } + private List> generateTokens(ApplicationAttemptId attemptId, + ApplicationTokenSecretManager appTokenMgr, + ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) { + ApplicationTokenIdentifier appTokenId = + new ApplicationTokenIdentifier(attemptId); + Token appToken = + new Token(appTokenId, appTokenMgr); + appToken.setService(new Text("appToken service")); + + ClientTokenIdentifier clientTokenId = new ClientTokenIdentifier(attemptId); + clientTokenMgr.registerApplication(attemptId); + Token clientToken = + new Token(clientTokenId, clientTokenMgr); + clientToken.setService(new Text("clientToken service")); + List> tokenPair = new ArrayList>(); + tokenPair.add(0, appToken); + tokenPair.add(1, clientToken); + return tokenPair; + } }