diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 46ac642..607c199 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -120,6 +120,25 @@ public static AllocateResponse newInstance(int responseId, response.setAMRMToken(amRMToken); return response; } + + @Public + @Unstable + public static AllocateResponse newInstance(int responseId, + List completedContainers, + List allocatedContainers, List updatedNodes, + Resource availResources, AMCommand command, int numClusterNodes, + PreemptionMessage preempt, List nmTokens, Token amRMToken, + List increasedContainers, + List decreasedContainers, + String aggregatorAddr) { + AllocateResponse response = + newInstance(responseId, completedContainers, allocatedContainers, + updatedNodes, availResources, command, numClusterNodes, preempt, + nmTokens, increasedContainers, decreasedContainers); + response.setAMRMToken(amRMToken); + response.setAggregatorAddr(aggregatorAddr); + return response; + } /** * If the ResourceManager needs the @@ -302,4 +321,18 @@ public abstract void setDecreasedContainers( @Private @Unstable public abstract void setAMRMToken(Token amRMToken); + + /** + * The address of aggregator that belong to this app + * + * @return The address of aggregator that belong to this attempt + */ + @Public + @Unstable + public abstract String getAggregatorAddr(); + + @Private + @Unstable + public abstract void setAggregatorAddr(String aggregatorAddr); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2720143..2e9ccbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -643,6 +643,11 @@ private static void addDeprecatedKeys() { NM_PREFIX + "container-manager.thread-count"; public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; + /** Number of threads container manager uses.*/ + public static final String NM_AGGREGATOR_SERVICE_THREAD_COUNT = + NM_PREFIX + "aggregator-service.thread-count"; + public static final int DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT = 5; + /** Number of threads used in cleanup.*/ public static final String NM_DELETE_THREAD_COUNT = NM_PREFIX + "delete.thread-count"; @@ -670,6 +675,13 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; + /** Address where the aggregator service IPC is.*/ + public static final String NM_AGGREGATOR_SERVICE_ADDRESS = + NM_PREFIX + "aggregator-service.address"; + public static final int DEFAULT_NM_AGGREGATOR_SERVICE_PORT = 8048; + public static final String DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS = + "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; + /** Interval in between cache cleanups.*/ public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = NM_PREFIX + "localizer.cache.cleanup.interval-ms"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 33d1207..4ae4806 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -87,6 +87,7 @@ message AllocateResponseProto { repeated ContainerResourceIncreaseProto increased_containers = 10; repeated ContainerResourceDecreaseProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; + optional string aggregator_addr = 13; } enum SchedulerResourceTypes { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index db49166..3a19ac2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -38,6 +38,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -100,6 +103,7 @@ import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * An ApplicationMaster for executing shell commands on a set of launched @@ -209,6 +213,13 @@ private String appMasterTrackingUrl = ""; private boolean newTimelineService = false; + + // For posting entities in new timeline service in a non-blocking way + // TODO replace with event loop in TimelineClient. + private static ExecutorService threadPool = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("TimelineService #%d") + .build()); // App Master configuration // No. of containers to run shell command on @@ -291,6 +302,19 @@ public static void main(String[] args) { } appMaster.run(); result = appMaster.finish(); + + threadPool.shutdown(); + + while (!threadPool.isTerminated()) { // wait for all posting thread to finish + try { + if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); // send interrupt to hurry them along + } + } catch (InterruptedException e) { + LOG.warn("Timeline client service stop interrupted!"); + break; + } + } } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); LogManager.shutdown(); @@ -515,8 +539,12 @@ public boolean init(String[] args) throws ParseException, IOException { } } // Creating the Timeline Client - timelineClient = TimelineClient.createTimelineClient( - appAttemptID.getApplicationId()); + if (newTimelineService) { + timelineClient = TimelineClient.createTimelineClient( + appAttemptID.getApplicationId()); + } else { + timelineClient = TimelineClient.createTimelineClient(); + } timelineClient.init(conf); timelineClient.start(); } else { @@ -589,8 +617,10 @@ public void run() throws YarnException, IOException { AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); + // need to bind timelineClient before start. + amRMClient.registerTimelineClient(timelineClient); amRMClient.start(); - + containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); @@ -727,7 +757,7 @@ protected boolean finish() { if(timelineClient != null) { timelineClient.stop(); } - + return success; } @@ -1192,6 +1222,18 @@ public TimelinePutResponse run() throws Exception { } private static void publishContainerStartEventOnNewTimelineService( + final TimelineClient timelineClient, final Container container, + final String domainId, final UserGroupInformation ugi) { + Runnable publishWrapper = new Runnable() { + public void run() { + publishContainerStartEventOnNewTimelineServiceBase(timelineClient, + container, domainId, ugi); + } + }; + threadPool.execute(publishWrapper); + } + + private static void publishContainerStartEventOnNewTimelineServiceBase( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = @@ -1223,10 +1265,22 @@ public TimelinePutResponse run() throws Exception { e instanceof UndeclaredThrowableException ? e.getCause() : e); } } - + private static void publishContainerEndEventOnNewTimelineService( - final TimelineClient timelineClient, ContainerStatus container, - String domainId, UserGroupInformation ugi) { + final TimelineClient timelineClient, final ContainerStatus container, + final String domainId, final UserGroupInformation ugi) { + Runnable publishWrapper = new Runnable() { + public void run() { + publishContainerEndEventOnNewTimelineServiceBase(timelineClient, + container, domainId, ugi); + } + }; + threadPool.execute(publishWrapper); + } + + private static void publishContainerEndEventOnNewTimelineServiceBase( + final TimelineClient timelineClient, final ContainerStatus container, + final String domainId, final UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity(); entity.setId(container.getContainerId().toString()); @@ -1257,6 +1311,20 @@ public TimelinePutResponse run() throws Exception { } private static void publishApplicationAttemptEventOnNewTimelineService( + final TimelineClient timelineClient, final String appAttemptId, + final DSEvent appEvent, final String domainId, + final UserGroupInformation ugi) { + + Runnable publishWrapper = new Runnable() { + public void run() { + publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, + appAttemptId, appEvent, domainId, ugi); + } + }; + threadPool.execute(publishWrapper); + } + + private static void publishApplicationAttemptEventOnNewTimelineServiceBase( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index 9923806..cb737b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -47,6 +47,8 @@ public abstract class AMRMClient extends AbstractService { private static final Log LOG = LogFactory.getLog(AMRMClient.class); + + private TimelineClient timelineClient; /** * Create a new instance of AMRMClient. @@ -374,6 +376,22 @@ public NMTokenCache getNMTokenCache() { } /** + * Register TimelineClient to AMRMClient. + * @param timelineClient + */ + public void registerTimelineClient(TimelineClient timelineClient) { + this.timelineClient = timelineClient; + } + + /** + * Get registered timeline client. + * @return + */ + public TimelineClient getRegisteredTimeineClient() { + return this.timelineClient; + } + + /** * Wait for check to return true for each 1000 ms. * See also {@link #waitFor(com.google.common.base.Supplier, int)} * and {@link #waitFor(com.google.common.base.Supplier, int, int)} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index f62e71b..be5610e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.annotations.VisibleForTesting; @@ -193,6 +194,22 @@ public abstract void unregisterApplicationMaster( * @return Current number of nodes in the cluster */ public abstract int getClusterNodeCount(); + + /** + * Register TimelineClient to AMRMClient. + * @param timelineClient + */ + public void registerTimelineClient(TimelineClient timelineClient) { + client.registerTimelineClient(timelineClient); + } + + /** + * Get registered timeline client. + * @return + */ + public TimelineClient getRegisteredTimeineClient() { + return client.getRegisteredTimeineClient(); + } /** * Update application's blacklist with addition or removal resources. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index addc3b6..f0f0bc9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -65,6 +66,8 @@ private volatile boolean keepRunning; private volatile float progress; + private volatile String aggregatorAddr; + private volatile Throwable savedException; public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) { @@ -304,7 +307,17 @@ public void run() { if (!allocated.isEmpty()) { handler.onContainersAllocated(allocated); } - + + String aggregatorAddress = response.getAggregatorAddr(); + TimelineClient timelineClient = client.getRegisteredTimeineClient(); + if (timelineClient != null && aggregatorAddress != null + && !aggregatorAddress.isEmpty()) { + if (aggregatorAddr == null || + !aggregatorAddr.equals(aggregatorAddress)) { + aggregatorAddr = aggregatorAddress; + timelineClient.setTimelineServiceAddress(aggregatorAddress); + } + } progress = handler.getProgress(); } catch (Throwable ex) { handler.onError(ex); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index f2796fd..605c29a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -384,6 +384,23 @@ public synchronized void setAMRMToken(Token amRMToken) { } this.amrmToken = amRMToken; } + + + @Override + public String getAggregatorAddr() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + return p.getAggregatorAddr(); + } + + @Override + public void setAggregatorAddr(String aggregatorAddr) { + maybeInitBuilder(); + if (aggregatorAddr == null) { + builder.clearAggregatorAddr(); + return; + } + builder.setAggregatorAddr(aggregatorAddr); + } private synchronized void initLocalIncreasedContainerList() { if (this.increasedContainers != null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index d40ad7c..5db347e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -42,7 +42,6 @@ public abstract class TimelineClient extends AbstractService { protected ApplicationId contextAppId; - protected String timelineServiceAddress; @Public public static TimelineClient createTimelineClient() { @@ -185,7 +184,6 @@ public abstract void putEntitiesAsync( * @param address * the timeline service address */ - public void setTimelineServiceAddress(String address) { - timelineServiceAddress = address; - } + public abstract void setTimelineServiceAddress(String address); + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 22dcc00..407682d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -115,6 +115,15 @@ private DelegationTokenAuthenticatedURL.Token token; private UserGroupInformation authUgi; private String doAsUser; + + private volatile String timelineServiceAddress; + + // Retry parameters for identifying new timeline service + // TODO consider to merge with connection retry + private int maxServiceRetries; + private long serviceRetryInterval; + + private boolean newTimelineService = false; @Private @VisibleForTesting @@ -260,6 +269,7 @@ public TimelineClientImpl() { public TimelineClientImpl(ApplicationId applicationId) { super(TimelineClientImpl.class.getName(), applicationId); + this.newTimelineService = true; } protected void serviceInit(Configuration conf) throws Exception { @@ -287,18 +297,32 @@ protected void serviceInit(Configuration conf) throws Exception { client = new Client(new URLConnectionClientHandler( new TimelineURLConnectionFactory()), cc); TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter(); - client.addFilter(retryFilter); + // TODO need to cleanup filter retry later. + if (!newTimelineService) { + client.addFilter(retryFilter); + } - if (YarnConfiguration.useHttps(conf)) { - timelineServiceAddress = conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS); + // old version timeline service need to get address from configuration + // while new version need to auto discovery (with retry). + if (newTimelineService) { + maxServiceRetries = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + serviceRetryInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); } else { - timelineServiceAddress = conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS); - } - LOG.info("Timeline service address: " + timelineServiceAddress); + if (YarnConfiguration.useHttps(conf)) { + setTimelineServiceAddress(conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS)); + } else { + setTimelineServiceAddress(conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS)); + } + LOG.info("Timeline service address: " + getTimelineServiceAddress()); + } super.serviceInit(conf); } @@ -341,8 +365,7 @@ private void putEntities(boolean async, if (async) { params.add("async", Boolean.TRUE.toString()); } - putObjects(constructResURI(getConfig(), timelineServiceAddress, true), - "entities", params, entitiesContainer); + putObjects("entities", params, entitiesContainer); } @Override @@ -350,6 +373,60 @@ public void putDomain(TimelineDomain domain) throws IOException, YarnException { doPosting(domain, "domain"); } + + // Used for new timeline service only + @Private + public void putObjects(String path, MultivaluedMap params, + Object obj) throws IOException, YarnException { + + // timelineServiceAddress could haven't be initialized yet + // or stale (only for new timeline service) + int retries = pollTimelineServiceAddress(this.maxServiceRetries); + + // timelineServiceAddress could be stale, add retry logic here. + boolean needRetry = true; + while (needRetry) { + try { + URI uri = constructResURI(getConfig(), timelineServiceAddress, true); + putObjects(uri, path, params, obj); + needRetry = false; + } + catch (Exception e) { + // TODO only handle exception for timelineServiceAddress being updated. + // skip retry for other exceptions. + checkRetryWithSleep(retries, e); + retries--; + } + } + } + + /** + * Check if reaching to maximum of retries. + * @param retries + * @param e + */ + private void checkRetryWithSleep(int retries, Exception e) throws + YarnException, IOException { + if (retries > 0) { + try { + Thread.sleep(this.serviceRetryInterval); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } else { + LOG.error( + "TimelineClient has reached to max retry times :" + + this.maxServiceRetries + " for service address: " + + timelineServiceAddress); + if (e instanceof YarnException) { + throw (YarnException)e; + } else if (e instanceof IOException) { + throw (IOException)e; + } else { + throw new YarnException(e); + } + } + } private void putObjects( URI base, String path, MultivaluedMap params, Object obj) @@ -410,6 +487,15 @@ public ClientResponse run() throws Exception { } return resp; } + + @Override + public void setTimelineServiceAddress(String address) { + this.timelineServiceAddress = address; + } + + private String getTimelineServiceAddress() { + return this.timelineServiceAddress; + } @SuppressWarnings("unchecked") @Override @@ -424,8 +510,10 @@ public ClientResponse run() throws Exception { DelegationTokenAuthenticatedURL authUrl = new DelegationTokenAuthenticatedURL(authenticator, connConfigurator); + // TODO we should add retry logic here if timelineServiceAddress is + // not available immediately. return (Token) authUrl.getDelegationToken( - constructResURI(getConfig(), timelineServiceAddress, false).toURL(), + constructResURI(getConfig(), getTimelineServiceAddress(), false).toURL(), token, renewer, doAsUser); } }; @@ -523,6 +611,7 @@ public boolean shouldRetryOn(Exception e) { return connectionRetry.retryOn(tokenRetryOp); } + // Old timeline service, no external retry logic. @Private @VisibleForTesting public ClientResponse doPostingObject(Object object, String path) { @@ -540,6 +629,24 @@ public ClientResponse doPostingObject(Object object, String path) { throw new YarnRuntimeException("Unknown resource type"); } } + + /** + * Poll TimelineServiceAddress for maximum of retries times if it is null + * @param retries + * @return the left retry times + */ + private int pollTimelineServiceAddress(int retries) { + while (timelineServiceAddress == null && retries > 0) { + try { + Thread.sleep(this.serviceRetryInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + timelineServiceAddress = getTimelineServiceAddress(); + retries--; + } + return retries; + } private class TimelineURLConnectionFactory implements HttpURLConnectionFactory { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java index 3aeb33e..e9a0a88 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java @@ -172,7 +172,7 @@ public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf, return getResolvedAddress(address); } - private static String getResolvedAddress(InetSocketAddress address) { + public static String getResolvedAddress(InetSocketAddress address) { address = NetUtils.getConnectAddress(address); StringBuilder sb = new StringBuilder(); InetAddress resolved = address.getAddress(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 66400c8..226d8ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -714,6 +714,12 @@ yarn.nodemanager.container-manager.thread-count 20 + + + Number of threads aggregator service uses. + yarn.nodemanager.aggregator-service.thread-count + 5 + Number of threads used in cleanup. @@ -782,6 +788,13 @@ yarn.nodemanager.localizer.address ${yarn.nodemanager.hostname}:8040 + + + + Address where the aggregator service IPC is. + yarn.nodemanager.aggregator-service.address + ${yarn.nodemanager.hostname}:8048 + Interval in between cache cleanups. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java index e2071dd..26d6d04 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -105,7 +106,7 @@ private void testRPCTimeout(String rpcClass) throws Exception { resource, System.currentTimeMillis() + 10000, 42, 42, Priority.newInstance(0), 0); Token containerToken = - TestRPC.newContainerToken(nodeId, "password".getBytes(), + newContainerToken(nodeId, "password".getBytes(), containerTokenIdentifier); StartContainerRequest scRequest = @@ -130,6 +131,19 @@ private void testRPCTimeout(String rpcClass) throws Exception { Assert.fail("timeout exception should have occurred!"); } + + public static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = + NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = + Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } public class DummyContainerManager implements ContainerManagementProtocol { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java deleted file mode 100644 index 39e6162..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ /dev/null @@ -1,247 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; -import org.apache.hadoop.yarn.ipc.RPCUtil; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.util.Records; -import org.junit.Assert; -import org.junit.Test; - -public class TestRPC { - - private static final String EXCEPTION_MSG = "test error"; - private static final String EXCEPTION_CAUSE = "exception cause"; - private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - - @Test - public void testUnknownCall() { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class - .getName()); - YarnRPC rpc = YarnRPC.create(conf); - String bindAddr = "localhost:0"; - InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - Server server = rpc.getServer(ContainerManagementProtocol.class, - new DummyContainerManager(), addr, conf, null, 1); - server.start(); - - // Any unrelated protocol would do - ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy( - ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf); - - try { - proxy.getNewApplication(Records - .newRecord(GetNewApplicationRequest.class)); - Assert.fail("Excepted RPC call to fail with unknown method."); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().matches( - "Unknown method getNewApplication called on.*" - + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" - + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol.")); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Test - public void testHadoopProtoRPC() throws Exception { - test(HadoopYarnProtoRPC.class.getName()); - } - - private void test(String rpcClass) throws Exception { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); - YarnRPC rpc = YarnRPC.create(conf); - String bindAddr = "localhost:0"; - InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - Server server = rpc.getServer(ContainerManagementProtocol.class, - new DummyContainerManager(), addr, conf, null, 1); - server.start(); - RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class); - ContainerManagementProtocol proxy = (ContainerManagementProtocol) - rpc.getProxy(ContainerManagementProtocol.class, - NetUtils.getConnectAddress(server), conf); - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - ApplicationId applicationId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId applicationAttemptId = - ApplicationAttemptId.newInstance(applicationId, 0); - ContainerId containerId = - ContainerId.newContainerId(applicationAttemptId, 100); - NodeId nodeId = NodeId.newInstance("localhost", 1234); - Resource resource = Resource.newInstance(1234, 2); - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier(containerId, "localhost", "user", - resource, System.currentTimeMillis() + 10000, 42, 42, - Priority.newInstance(0), 0); - Token containerToken = newContainerToken(nodeId, "password".getBytes(), - containerTokenIdentifier); - - StartContainerRequest scRequest = - StartContainerRequest.newInstance(containerLaunchContext, - containerToken); - List list = new ArrayList(); - list.add(scRequest); - StartContainersRequest allRequests = - StartContainersRequest.newInstance(list); - proxy.startContainers(allRequests); - - List containerIds = new ArrayList(); - containerIds.add(containerId); - GetContainerStatusesRequest gcsRequest = - GetContainerStatusesRequest.newInstance(containerIds); - GetContainerStatusesResponse response = - proxy.getContainerStatuses(gcsRequest); - List statuses = response.getContainerStatuses(); - - //test remote exception - boolean exception = false; - try { - StopContainersRequest stopRequest = - recordFactory.newRecordInstance(StopContainersRequest.class); - stopRequest.setContainerIds(containerIds); - proxy.stopContainers(stopRequest); - } catch (YarnException e) { - exception = true; - Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG)); - Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE)); - System.out.println("Test Exception is " + e.getMessage()); - } catch (Exception ex) { - ex.printStackTrace(); - } - Assert.assertTrue(exception); - - server.stop(); - Assert.assertNotNull(statuses.get(0)); - Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState()); - } - - public class DummyContainerManager implements ContainerManagementProtocol { - - private List statuses = new ArrayList(); - - @Override - public GetContainerStatusesResponse getContainerStatuses( - GetContainerStatusesRequest request) - throws YarnException { - GetContainerStatusesResponse response = - recordFactory.newRecordInstance(GetContainerStatusesResponse.class); - response.setContainerStatuses(statuses); - return response; - } - - @Override - public StartContainersResponse startContainers( - StartContainersRequest requests) throws YarnException { - StartContainersResponse response = - recordFactory.newRecordInstance(StartContainersResponse.class); - for (StartContainerRequest request : requests.getStartContainerRequests()) { - Token containerToken = request.getContainerToken(); - ContainerTokenIdentifier tokenId = null; - - try { - tokenId = newContainerTokenIdentifier(containerToken); - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); - } - ContainerStatus status = - recordFactory.newRecordInstance(ContainerStatus.class); - status.setState(ContainerState.RUNNING); - status.setContainerId(tokenId.getContainerID()); - status.setExitStatus(0); - statuses.add(status); - - } - return response; - } - - @Override - public StopContainersResponse stopContainers(StopContainersRequest request) - throws YarnException { - Exception e = new Exception(EXCEPTION_MSG, - new Exception(EXCEPTION_CAUSE)); - throw new YarnException(e); - } - } - - public static ContainerTokenIdentifier newContainerTokenIdentifier( - Token containerToken) throws IOException { - org.apache.hadoop.security.token.Token token = - new org.apache.hadoop.security.token.Token( - containerToken.getIdentifier() - .array(), containerToken.getPassword().array(), new Text( - containerToken.getKind()), - new Text(containerToken.getService())); - return token.decodeIdentifier(); - } - - public static Token newContainerToken(NodeId nodeId, byte[] password, - ContainerTokenIdentifier tokenIdentifier) { - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = - NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); - // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token - Token containerToken = - Token.newInstance(tokenIdentifier.getBytes(), - ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return containerToken; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java index fbe9af9..ef0bdcc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -111,4 +111,21 @@ public void testAllocateResponseWithoutIncDecContainers() { Assert.assertEquals(0, r.getIncreasedContainers().size()); Assert.assertEquals(0, r.getDecreasedContainers().size()); } + + @SuppressWarnings("deprecation") + @Test + public void testAllocateResponseWithAggregatorAddress() { + final String aggregatorAddr = "localhost:0"; + AllocateResponse r = + AllocateResponse.newInstance(3, new ArrayList(), + new ArrayList(), new ArrayList(), null, + AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, + null, null, aggregatorAddr); + + AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto(); + r = new AllocateResponsePBImpl(p); + + // check value + Assert.assertEquals(aggregatorAddr, r.getAggregatorAddr()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 1a4dab8..d1e4acb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -142,6 +142,7 @@ yarn_server_common_service_protos.proto ResourceTracker.proto SCMUploader.proto + aggregatornodemanager_protocol.proto ${project.build.directory}/generated-sources/java diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java new file mode 100644 index 0000000..53bdb4e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; + +/** + *

The protocol between an TimelineAggregatorsCollection and a + * NodeManager to report a new application aggregator get launched. + *

+ * + */ +@Private +public interface AggregatorNodemanagerProtocol { + + /** + * + *

+ * The TimelineAggregatorsCollection provides a list of mapping + * between application and aggregator's address in + * {@link ReportNewAggregatorsInfoRequest} to a NodeManager to + * register aggregator's info, include: applicationId and REST URI to + * access aggregator. NodeManager will add them into registered aggregators + * and register them into ResourceManager afterwards. + *

+ * + * @param request the request of registering a new aggregator or a list of aggregators + * @return + * @throws YarnException + * @throws IOException + */ + ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( + ReportNewAggregatorsInfoRequest request) + throws YarnException, IOException; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java new file mode 100644 index 0000000..4df80a5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.AggregatorNodemanagerProtocol.AggregatorNodemanagerProtocolService; + +@Private +@Unstable +@ProtocolInfo( + protocolName = "org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB", + protocolVersion = 1) +public interface AggregatorNodemanagerProtocolPB extends + AggregatorNodemanagerProtocolService.BlockingInterface { + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java new file mode 100644 index 0000000..6e777e7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl; + +import com.google.protobuf.ServiceException; + +public class AggregatorNodemanagerProtocolPBClientImpl implements + AggregatorNodemanagerProtocol, Closeable { + + // Not a documented config. Only used for tests internally + static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX + + "rpc.nm-command-timeout"; + + /** + * Maximum of 1 minute timeout for a Node to react to the command + */ + static final int DEFAULT_COMMAND_TIMEOUT = 60000; + + private AggregatorNodemanagerProtocolPB proxy; + + @Private + public AggregatorNodemanagerProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, AggregatorNodemanagerProtocolPB.class, + ProtobufRpcEngine.class); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT); + proxy = + (AggregatorNodemanagerProtocolPB) RPC.getProxy( + AggregatorNodemanagerProtocolPB.class, + clientVersion, addr, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), expireIntvl); + } + + @Override + public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( + ReportNewAggregatorsInfoRequest request) throws YarnException, IOException { + + ReportNewAggregatorsInfoRequestProto requestProto = + ((ReportNewAggregatorsInfoRequestPBImpl) request).getProto(); + try { + return new ReportNewAggregatorsInfoResponsePBImpl( + proxy.reportNewAggregatorInfo(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java new file mode 100644 index 0000000..87bce16 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.impl.pb.service; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class AggregatorNodemanagerProtocolPBServiceImpl implements + AggregatorNodemanagerProtocolPB { + + private AggregatorNodemanagerProtocol real; + + public AggregatorNodemanagerProtocolPBServiceImpl(AggregatorNodemanagerProtocol impl) { + this.real = impl; + } + + @Override + public ReportNewAggregatorsInfoResponseProto reportNewAggregatorInfo( + RpcController arg0, ReportNewAggregatorsInfoRequestProto proto) + throws ServiceException { + ReportNewAggregatorsInfoRequestPBImpl request = + new ReportNewAggregatorsInfoRequestPBImpl(proto); + try { + ReportNewAggregatorsInfoResponse response = real.reportNewAggregatorInfo(request); + return ((ReportNewAggregatorsInfoResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index addd3fe..0b020b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -36,6 +40,21 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); return nodeHeartbeatRequest; } + + public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, + MasterKey lastKnownContainerTokenMasterKey, + MasterKey lastKnownNMTokenMasterKey, + Map registeredAggregators) { + NodeHeartbeatRequest nodeHeartbeatRequest = + Records.newRecord(NodeHeartbeatRequest.class); + nodeHeartbeatRequest.setNodeStatus(nodeStatus); + nodeHeartbeatRequest + .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); + nodeHeartbeatRequest + .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + nodeHeartbeatRequest.setRegisteredAggregators(registeredAggregators); + return nodeHeartbeatRequest; + } public abstract NodeStatus getNodeStatus(); public abstract void setNodeStatus(NodeStatus status); @@ -45,4 +64,8 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); + + // This tells RM registered aggregators' address info on this node + public abstract Map getRegisteredAggregators(); + public abstract void setRegisteredAggregators(Map appAggregatorsMap); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 9fb44ca..262ca07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -35,6 +35,10 @@ List getContainersToBeRemovedFromNM(); List getApplicationsToCleanup(); + + // This tells NM the aggregators' address info of related Apps + Map getAppAggregatorsMap(); + void setAppAggregatorsMap(Map appAggregatorsMap); void setResponseId(int responseId); void setNodeAction(NodeAction action); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java new file mode 100644 index 0000000..ae538a2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.List; +import java.util.Arrays; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; +import org.apache.hadoop.yarn.util.Records; + +@Private +public abstract class ReportNewAggregatorsInfoRequest { + + public static ReportNewAggregatorsInfoRequest newInstance( + List appAggregatorsList) { + ReportNewAggregatorsInfoRequest request = + Records.newRecord(ReportNewAggregatorsInfoRequest.class); + request.setAppAggregatorsList(appAggregatorsList); + return request; + } + + public static ReportNewAggregatorsInfoRequest newInstance( + ApplicationId id, String aggregatorAddr) { + ReportNewAggregatorsInfoRequest request = + Records.newRecord(ReportNewAggregatorsInfoRequest.class); + request.setAppAggregatorsList( + Arrays.asList(AppAggregatorsMap.newInstance(id, aggregatorAddr))); + return request; + } + + public abstract List getAppAggregatorsList(); + + public abstract void setAppAggregatorsList( + List appAggregatorsList); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java new file mode 100644 index 0000000..3b847d6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.util.Records; + +public abstract class ReportNewAggregatorsInfoResponse { + + @Private + public static ReportNewAggregatorsInfoResponse newInstance() { + ReportNewAggregatorsInfoResponse response = + Records.newRecord(ReportNewAggregatorsInfoResponse.class); + return response; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 26d1f19..39eaabd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,8 +18,16 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -36,6 +44,7 @@ private NodeStatus nodeStatus = null; private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; + Map registeredAggregators = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -80,6 +89,20 @@ private void mergeLocalToBuilder() { builder.setLastKnownNmTokenMasterKey( convertToProtoFormat(this.lastKnownNMTokenMasterKey)); } + + if (this.registeredAggregators != null) { + addRegisteredAggregatorsToProto(); + } + } + + private void addRegisteredAggregatorsToProto() { + maybeInitBuilder(); + builder.clearRegisteredAggregators(); + for (Map.Entry entry : registeredAggregators.entrySet()) { + builder.addRegisteredAggregators(AppAggregatorsMapProto.newBuilder() + .setAppId(convertToProtoFormat(entry.getKey())) + .setAppAggregatorAddr(entry.getValue())); + } } private void mergeLocalToProto() { @@ -162,6 +185,36 @@ public void setLastKnownNMTokenMasterKey(MasterKey masterKey) { builder.clearLastKnownNmTokenMasterKey(); this.lastKnownNMTokenMasterKey = masterKey; } + + @Override + public Map getRegisteredAggregators() { + if (this.registeredAggregators != null) { + return this.registeredAggregators; + } + initRegisteredAggregators(); + return registeredAggregators; + } + + private void initRegisteredAggregators() { + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getRegisteredAggregatorsList(); + this.registeredAggregators = new HashMap (); + for (AppAggregatorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.registeredAggregators.put(appId, c.getAppAggregatorAddr()); + } + } + + @Override + public void setRegisteredAggregators( + Map registeredAggregators) { + if (registeredAggregators == null || registeredAggregators.isEmpty()) { + return; + } + maybeInitBuilder(); + this.registeredAggregators = new HashMap(); + this.registeredAggregators.putAll(registeredAggregators); + } private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { return new NodeStatusPBImpl(p); @@ -170,6 +223,14 @@ private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { private NodeStatusProto convertToProtoFormat(NodeStatus t) { return ((NodeStatusPBImpl)t).getProto(); } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { return new MasterKeyPBImpl(p); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 630a5bf..019b2ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; @@ -55,6 +56,8 @@ private List containersToBeRemovedFromNM = null; private List applicationsToCleanup = null; private Map systemCredentials = null; + + Map appAggregatorsMap = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -96,6 +99,10 @@ private void mergeLocalToBuilder() { if (this.systemCredentials != null) { addSystemCredentialsToProto(); } + + if (this.appAggregatorsMap != null) { + addAppAggregatorsMapToProto(); + } } private void addSystemCredentialsToProto() { @@ -108,6 +115,16 @@ private void addSystemCredentialsToProto() { entry.getValue().duplicate()))); } } + + private void addAppAggregatorsMapToProto() { + maybeInitBuilder(); + builder.clearAppAggregatorsMap(); + for (Map.Entry entry : appAggregatorsMap.entrySet()) { + builder.addAppAggregatorsMap(AppAggregatorsMapProto.newBuilder() + .setAppId(convertToProtoFormat(entry.getKey())) + .setAppAggregatorAddr(entry.getValue())); + } + } private void mergeLocalToProto() { if (viaProto) @@ -417,6 +434,15 @@ public void remove() { initSystemCredentials(); return systemCredentials; } + + @Override + public Map getAppAggregatorsMap() { + if (this.appAggregatorsMap != null) { + return this.appAggregatorsMap; + } + initAppAggregatorsMap(); + return appAggregatorsMap; + } private void initSystemCredentials() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; @@ -428,6 +454,16 @@ private void initSystemCredentials() { this.systemCredentials.put(appId, byteBuffer); } } + + private void initAppAggregatorsMap() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getAppAggregatorsMapList(); + this.appAggregatorsMap = new HashMap (); + for (AppAggregatorsMapProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + this.appAggregatorsMap.put(appId, c.getAppAggregatorAddr()); + } + } @Override public void setSystemCredentialsForApps( @@ -439,6 +475,17 @@ public void setSystemCredentialsForApps( this.systemCredentials = new HashMap(); this.systemCredentials.putAll(systemCredentials); } + + @Override + public void setAppAggregatorsMap( + Map appAggregatorsMap) { + if (appAggregatorsMap == null || appAggregatorsMap.isEmpty()) { + return; + } + maybeInitBuilder(); + this.appAggregatorsMap = new HashMap(); + this.appAggregatorsMap.putAll(appAggregatorsMap); + } @Override public long getNextHeartBeatInterval() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java new file mode 100644 index 0000000..eb7beef --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppAggregatorsMapPBImpl; + +public class ReportNewAggregatorsInfoRequestPBImpl extends + ReportNewAggregatorsInfoRequest { + + ReportNewAggregatorsInfoRequestProto proto = + ReportNewAggregatorsInfoRequestProto.getDefaultInstance(); + + ReportNewAggregatorsInfoRequestProto.Builder builder = null; + boolean viaProto = false; + + private List aggregatorsList = null; + + public ReportNewAggregatorsInfoRequestPBImpl() { + builder = ReportNewAggregatorsInfoRequestProto.newBuilder(); + } + + public ReportNewAggregatorsInfoRequestPBImpl( + ReportNewAggregatorsInfoRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReportNewAggregatorsInfoRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (aggregatorsList != null) { + addLocalAggregatorsToProto(); + } + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ReportNewAggregatorsInfoRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void addLocalAggregatorsToProto() { + maybeInitBuilder(); + builder.clearAppAggregators(); + List protoList = + new ArrayList(); + for (AppAggregatorsMap m : this.aggregatorsList) { + protoList.add(convertToProtoFormat(m)); + } + builder.addAllAppAggregators(protoList); + } + + private void initLocalAggregatorsList() { + ReportNewAggregatorsInfoRequestProtoOrBuilder p = viaProto ? proto : builder; + List aggregatorsList = + p.getAppAggregatorsList(); + this.aggregatorsList = new ArrayList(); + for (AppAggregatorsMapProto m : aggregatorsList) { + this.aggregatorsList.add(convertFromProtoFormat(m)); + } + } + + @Override + public List getAppAggregatorsList() { + if (this.aggregatorsList == null) { + initLocalAggregatorsList(); + } + return this.aggregatorsList; + } + + @Override + public void setAppAggregatorsList(List appAggregatorsList) { + maybeInitBuilder(); + if (appAggregatorsList == null) { + builder.clearAppAggregators(); + } + this.aggregatorsList = appAggregatorsList; + } + + private AppAggregatorsMapPBImpl convertFromProtoFormat( + AppAggregatorsMapProto p) { + return new AppAggregatorsMapPBImpl(p); + } + + private AppAggregatorsMapProto convertToProtoFormat( + AppAggregatorsMap m) { + return ((AppAggregatorsMapPBImpl) m).getProto(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java new file mode 100644 index 0000000..0f0925a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class ReportNewAggregatorsInfoResponsePBImpl extends + ReportNewAggregatorsInfoResponse { + + ReportNewAggregatorsInfoResponseProto proto = + ReportNewAggregatorsInfoResponseProto.getDefaultInstance(); + + ReportNewAggregatorsInfoResponseProto.Builder builder = null; + + boolean viaProto = false; + + public ReportNewAggregatorsInfoResponsePBImpl() { + builder = ReportNewAggregatorsInfoResponseProto.newBuilder(); + } + + public ReportNewAggregatorsInfoResponsePBImpl(ReportNewAggregatorsInfoResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public ReportNewAggregatorsInfoResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java new file mode 100644 index 0000000..67c377d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + + +@Private +public abstract class AppAggregatorsMap { + + public static AppAggregatorsMap newInstance( + ApplicationId id, String aggregatorAddr) { + AppAggregatorsMap appAggregatorMap = + Records.newRecord(AppAggregatorsMap.class); + appAggregatorMap.setApplicationId(id); + appAggregatorMap.setAggregatorAddr(aggregatorAddr); + return appAggregatorMap; + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId( + ApplicationId id); + + public abstract String getAggregatorAddr(); + + public abstract void setAggregatorAddr( + String addr); + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java new file mode 100644 index 0000000..32903e2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java @@ -0,0 +1,151 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; + +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class AppAggregatorsMapPBImpl extends AppAggregatorsMap { + + AppAggregatorsMapProto proto = + AppAggregatorsMapProto.getDefaultInstance(); + + AppAggregatorsMapProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId appId = null; + private String aggregatorAddr = null; + + public AppAggregatorsMapPBImpl() { + builder = AppAggregatorsMapProto.newBuilder(); + } + + public AppAggregatorsMapPBImpl(AppAggregatorsMapProto proto) { + this.proto = proto; + viaProto = true; + } + + public AppAggregatorsMapProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public ApplicationId getApplicationId() { + AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder; + if (this.appId == null && p.hasAppId()) { + this.appId = convertFromProtoFormat(p.getAppId()); + } + return this.appId; + } + + @Override + public String getAggregatorAddr() { + AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder; + if (this.aggregatorAddr == null + && p.hasAppAggregatorAddr()) { + this.aggregatorAddr = p.getAppAggregatorAddr(); + } + return this.aggregatorAddr; + } + + @Override + public void setApplicationId(ApplicationId appId) { + maybeInitBuilder(); + if (appId == null) { + builder.clearAppId(); + } + this.appId = appId; + } + + @Override + public void setAggregatorAddr(String aggregatorAddr) { + maybeInitBuilder(); + if (aggregatorAddr == null) { + builder.clearAppAggregatorAddr(); + } + this.aggregatorAddr = aggregatorAddr; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = AppAggregatorsMapProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.appId != null) { + builder.setAppId(convertToProtoFormat(this.appId)); + } + if (this.aggregatorAddr != null) { + builder.setAppAggregatorAddr(this.aggregatorAddr); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto new file mode 100644 index 0000000..d7b05c1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "AggregatorNodemanagerProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_server_common_service_protos.proto"; + +service AggregatorNodemanagerProtocolService { + rpc reportNewAggregatorInfo (ReportNewAggregatorsInfoRequestProto) returns (ReportNewAggregatorsInfoResponseProto); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 91473c5..3b03f58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -47,6 +47,7 @@ message NodeHeartbeatRequestProto { optional NodeStatusProto node_status = 1; optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; + repeated AppAggregatorsMapProto registered_aggregators = 4; } message NodeHeartbeatResponseProto { @@ -60,6 +61,7 @@ message NodeHeartbeatResponseProto { optional string diagnostics_message = 8; repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; + repeated AppAggregatorsMapProto app_aggregators_map = 11; } message SystemCredentialsForAppsProto { @@ -67,6 +69,25 @@ message SystemCredentialsForAppsProto { optional bytes credentialsForApp = 2; } +//////////////////////////////////////////////////////////////////////// +////// From aggregator_nodemanager_protocol //////////////////////////// +//////////////////////////////////////////////////////////////////////// +message AppAggregatorsMapProto { + optional ApplicationIdProto appId = 1; + optional string appAggregatorAddr = 2; +} + +////////////////////////////////////////////////////// +/////// aggregator_nodemanager_protocol ////////////// +////////////////////////////////////////////////////// +message ReportNewAggregatorsInfoRequestProto { + repeated AppAggregatorsMapProto app_aggregators = 1; +} + +message ReportNewAggregatorsInfoResponseProto { +} + + message NMContainerStatusProto { optional ContainerIdProto container_id = 1; optional ContainerStateProto container_state = 2; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java new file mode 100644 index 0000000..af9d60f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -0,0 +1,345 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestRPC { + + private static final String EXCEPTION_MSG = "test error"; + private static final String EXCEPTION_CAUSE = "exception cause"; + private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + public static final String ILLEGAL_NUMBER_MESSAGE = + "aggregators' number in ReportNewAggregatorsInfoRequest is not ONE."; + + public static final String DEFAULT_AGGREGATOR_ADDR = "localhost:0"; + + public static final ApplicationId DEFAULT_APP_ID = + ApplicationId.newInstance(0, 0); + + @Test + public void testUnknownCall() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class + .getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + + // Any unrelated protocol would do + ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf); + + try { + proxy.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + Assert.fail("Excepted RPC call to fail with unknown method."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().matches( + "Unknown method getNewApplication called on.*" + + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" + + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol.")); + } catch (Exception e) { + e.printStackTrace(); + } finally { + server.stop(); + } + } + + @Test + public void testRPCOnAggregatorNodeManagerProtocol() throws IOException { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class + .getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(AggregatorNodemanagerProtocol.class, + new DummyNMAggregatorService(), addr, conf, null, 1); + server.start(); + + // Test unrelated protocol wouldn't get response + ApplicationClientProtocol unknownProxy = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf); + + try { + unknownProxy.getNewApplication(Records + .newRecord(GetNewApplicationRequest.class)); + Assert.fail("Excepted RPC call to fail with unknown method."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().matches( + "Unknown method getNewApplication called on.*" + + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol" + + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol.")); + } catch (Exception e) { + e.printStackTrace(); + } + + // Test AggregatorNodemanagerProtocol get proper response + AggregatorNodemanagerProtocol proxy = (AggregatorNodemanagerProtocol)rpc.getProxy( + AggregatorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf); + // Verify request with DEFAULT_APP_ID and DEFAULT_AGGREGATOR_ADDR get + // normally response. + try { + ReportNewAggregatorsInfoRequest request = + ReportNewAggregatorsInfoRequest.newInstance( + DEFAULT_APP_ID, DEFAULT_AGGREGATOR_ADDR); + proxy.reportNewAggregatorInfo(request); + } catch (YarnException e) { + Assert.fail("RPC call failured is not expected here."); + } + + // Verify empty request get YarnException back (by design in + // DummyNMAggregatorService) + try { + proxy.reportNewAggregatorInfo(Records + .newRecord(ReportNewAggregatorsInfoRequest.class)); + Assert.fail("Excepted RPC call to fail with YarnException."); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE)); + } + + server.stop(); + } + + @Test + public void testHadoopProtoRPC() throws Exception { + test(HadoopYarnProtoRPC.class.getName()); + } + + private void test(String rpcClass) throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + Server server = rpc.getServer(ContainerManagementProtocol.class, + new DummyContainerManager(), addr, conf, null, 1); + server.start(); + RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class); + ContainerManagementProtocol proxy = (ContainerManagementProtocol) + rpc.getProxy(ContainerManagementProtocol.class, + NetUtils.getConnectAddress(server), conf); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + ApplicationId applicationId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + ContainerId containerId = + ContainerId.newContainerId(applicationAttemptId, 100); + NodeId nodeId = NodeId.newInstance("localhost", 1234); + Resource resource = Resource.newInstance(1234, 2); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier(containerId, "localhost", "user", + resource, System.currentTimeMillis() + 10000, 42, 42, + Priority.newInstance(0), 0); + Token containerToken = newContainerToken(nodeId, "password".getBytes(), + containerTokenIdentifier); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + containerToken); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + proxy.startContainers(allRequests); + + List containerIds = new ArrayList(); + containerIds.add(containerId); + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + GetContainerStatusesResponse response = + proxy.getContainerStatuses(gcsRequest); + List statuses = response.getContainerStatuses(); + + //test remote exception + boolean exception = false; + try { + StopContainersRequest stopRequest = + recordFactory.newRecordInstance(StopContainersRequest.class); + stopRequest.setContainerIds(containerIds); + proxy.stopContainers(stopRequest); + } catch (YarnException e) { + exception = true; + Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG)); + Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE)); + System.out.println("Test Exception is " + e.getMessage()); + } catch (Exception ex) { + ex.printStackTrace(); + } finally { + server.stop(); + } + Assert.assertTrue(exception); + Assert.assertNotNull(statuses.get(0)); + Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState()); + } + + public class DummyContainerManager implements ContainerManagementProtocol { + + private List statuses = new ArrayList(); + + @Override + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) + throws YarnException { + GetContainerStatusesResponse response = + recordFactory.newRecordInstance(GetContainerStatusesResponse.class); + response.setContainerStatuses(statuses); + return response; + } + + @Override + public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException { + StartContainersResponse response = + recordFactory.newRecordInstance(StartContainersResponse.class); + for (StartContainerRequest request : requests.getStartContainerRequests()) { + Token containerToken = request.getContainerToken(); + ContainerTokenIdentifier tokenId = null; + + try { + tokenId = newContainerTokenIdentifier(containerToken); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } + ContainerStatus status = + recordFactory.newRecordInstance(ContainerStatus.class); + status.setState(ContainerState.RUNNING); + status.setContainerId(tokenId.getContainerID()); + status.setExitStatus(0); + statuses.add(status); + + } + return response; + } + + @Override + public StopContainersResponse stopContainers(StopContainersRequest request) + throws YarnException { + Exception e = new Exception(EXCEPTION_MSG, + new Exception(EXCEPTION_CAUSE)); + throw new YarnException(e); + } + } + + public static ContainerTokenIdentifier newContainerTokenIdentifier( + Token containerToken) throws IOException { + org.apache.hadoop.security.token.Token token = + new org.apache.hadoop.security.token.Token( + containerToken.getIdentifier() + .array(), containerToken.getPassword().array(), new Text( + containerToken.getKind()), + new Text(containerToken.getService())); + return token.decodeIdentifier(); + } + + public static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = + NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = + Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + + // A dummy implementation for AggregatorNodemanagerProtocol for test purpose, + // it only can accept one appID, aggregatorAddr pair or throw exceptions + public class DummyNMAggregatorService + implements AggregatorNodemanagerProtocol { + + @Override + public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( + ReportNewAggregatorsInfoRequest request) + throws YarnException, IOException { + List appAggregators = request.getAppAggregatorsList(); + if (appAggregators.size() == 1) { + // check default appID and aggregatorAddr + AppAggregatorsMap appAggregator = appAggregators.get(0); + Assert.assertEquals(appAggregator.getApplicationId(), + DEFAULT_APP_ID); + Assert.assertEquals(appAggregator.getAggregatorAddr(), + DEFAULT_AGGREGATOR_ADDR); + } else { + throw new YarnException(ILLEGAL_NUMBER_MESSAGE); + } + + ReportNewAggregatorsInfoResponse response = + recordFactory.newRecordInstance(ReportNewAggregatorsInfoResponse.class); + return response; + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index 20983b6..47cf8ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -24,6 +24,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -89,11 +91,14 @@ public void testNodeHeartbeatRequestPBImpl() { original.setLastKnownContainerTokenMasterKey(getMasterKey()); original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); + Map aggregators = getAggregators(); + original.setRegisteredAggregators(aggregators); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); + assertEquals(aggregators, copy.getRegisteredAggregators()); } /** @@ -110,6 +115,8 @@ public void testNodeHeartbeatResponsePBImpl() { original.setNextHeartBeatInterval(1000); original.setNodeAction(NodeAction.NORMAL); original.setResponseId(100); + Map aggregators = getAggregators(); + original.setAppAggregatorsMap(aggregators); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); @@ -119,6 +126,7 @@ public void testNodeHeartbeatResponsePBImpl() { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); + assertEquals(aggregators, copy.getAppAggregatorsMap()); } /** @@ -208,6 +216,15 @@ public void testNodeStatusPBImpl() { } + private Map getAggregators() { + ApplicationId appID = ApplicationId.newInstance(1L, 1); + String aggregatorAddr = "localhost:0"; + Map aggregatorMap = + new HashMap(); + aggregatorMap.put(appID, aggregatorAddr); + return aggregatorMap; + } + private ContainerStatus getContainerStatus(int applicationId, int containerID, int appAttemptId) { ContainerStatus status = recordFactory diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 6e7e2ec..85f3f0d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -57,6 +57,19 @@ ConcurrentMap getApplications(); Map getSystemCredentialsForApps(); + + /** + * Get the registered aggregators that located on this NM. + * @return registered + */ + Map getRegisteredAggregators(); + + /** + * Return the known aggregators which get from RM for all active applications + * running on this NM. + * @return known aggregators. + */ + Map getKnownAggregators(); ConcurrentMap getContainers(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a4be120..10143db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.aggregatormanager.NMAggregatorService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -84,8 +85,9 @@ private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + private NMAggregatorService nmAggregatorService; private NodeStatusUpdater nodeStatusUpdater; - private static CompositeServiceShutdownHook nodeManagerShutdownHook; + private static CompositeServiceShutdownHook nodeManagerShutdownHook; private NMStateStoreService nmStore = null; private AtomicBoolean isStopping = new AtomicBoolean(false); @@ -112,6 +114,10 @@ protected ContainerManagerImpl createContainerManager(Context context, return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, metrics, aclsManager, dirsHandler); } + + protected NMAggregatorService createNMAggregatorService(Context context) { + return new NMAggregatorService(context); + } protected WebServer createWebServer(Context nmContext, ResourceView resourceView, ApplicationACLsManager aclsManager, @@ -268,6 +274,9 @@ protected void serviceInit(Configuration conf) throws Exception { addService(dispatcher); DefaultMetricsSystem.initialize("NodeManager"); + + this.nmAggregatorService = createNMAggregatorService(context); + addService(nmAggregatorService); // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. @@ -345,6 +354,12 @@ public void run() { protected final ConcurrentMap containers = new ConcurrentSkipListMap(); + + protected Map registeredAggregators = + new ConcurrentHashMap(); + + protected Map knownAggregators = + new ConcurrentHashMap(); private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; @@ -460,6 +475,30 @@ public void setSystemCrendentialsForApps( Map systemCredentials) { this.systemCredentials = systemCredentials; } + + @Override + public Map getRegisteredAggregators() { + return this.registeredAggregators; + } + + public void addRegisteredAggregators( + Map newRegisteredAggregators) { + this.registeredAggregators.putAll(newRegisteredAggregators); + // Update to knownAggregators as well so it can immediately be consumed by + // this NM's TimelineClient. + this.knownAggregators.putAll(newRegisteredAggregators); + } + + @Override + public Map getKnownAggregators() { + return this.knownAggregators; + } + + public void addKnownAggregators( + Map knownAggregators) { + this.knownAggregators.putAll(knownAggregators); + } + } @@ -523,6 +562,11 @@ Dispatcher getNMDispatcher(){ public Context getNMContext() { return this.context; } + + // For testing + NMAggregatorService getNMAggregatorService() { + return this.nmAggregatorService; + } public static void main(String[] args) throws IOException { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 6ddd7e4..c855833 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -592,7 +592,8 @@ public void run() { NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey(), NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() - .getCurrentKey()); + .getCurrentKey(), + NodeStatusUpdaterImpl.this.context.getRegisteredAggregators()); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -655,6 +656,10 @@ public void run() { ((NMContext) context) .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); } + + Map knownAggregators = response.getAppAggregatorsMap(); + ((NodeManager.NMContext)context).addKnownAggregators(knownAggregators); + } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java new file mode 100644 index 0000000..17150ba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.aggregatormanager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.CompositeService; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; + +public class NMAggregatorService extends CompositeService implements + AggregatorNodemanagerProtocol { + + private static final Log LOG = LogFactory.getLog(NMAggregatorService.class); + + final Context context; + + private Server server; + + public NMAggregatorService(Context context) { + + super(NMAggregatorService.class.getName()); + this.context = context; + } + + @Override + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + + InetSocketAddress aggregatorServerAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT); + + Configuration serverConf = new Configuration(conf); + + // TODO Security settings. + YarnRPC rpc = YarnRPC.create(conf); + + server = + rpc.getServer(AggregatorNodemanagerProtocol.class, this, + aggregatorServerAddress, serverConf, + this.context.getNMTokenSecretManager(), + conf.getInt(YarnConfiguration.NM_AGGREGATOR_SERVICE_THREAD_COUNT, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT)); + + server.start(); + // start remaining services + super.serviceStart(); + LOG.info("NMAggregatorService started at " + aggregatorServerAddress); + } + + + @Override + public void serviceStop() throws Exception { + if (server != null) { + server.stop(); + } + // TODO may cleanup app aggregators running on this NM in future. + super.serviceStop(); + } + + @Override + public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo( + ReportNewAggregatorsInfoRequest request) throws IOException { + List newAggregatorsList = request.getAppAggregatorsList(); + if (newAggregatorsList != null && !newAggregatorsList.isEmpty()) { + Map newAggregatorsMap = + new HashMap(); + for (AppAggregatorsMap aggregator : newAggregatorsList) { + newAggregatorsMap.put(aggregator.getApplicationId(), aggregator.getAggregatorAddr()); + } + ((NodeManager.NMContext)context).addRegisteredAggregators(newAggregatorsMap); + } + + return ReportNewAggregatorsInfoResponse.newInstance(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index a73b113..6bf3bbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -425,6 +425,10 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { new LogHandlerAppFinishedEvent(app.appId)); app.context.getNMTokenSecretManager().appFinished(app.getAppId()); + // Remove aggregator info for finished apps. + // TODO check we remove related aggregators info in failure cases (YARN-3038) + app.context.getRegisteredAggregators().remove(app.getAppId()); + app.context.getKnownAggregators().remove(app.getAppId()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 1c7f987..2eb1a7f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -344,6 +344,8 @@ public FinishApplicationMasterResponse finishApplicationMaster( RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); + // Remove aggregator address when app get finished. + rmApp.removeAggregatorAddr(); // checking whether the app exits in RMStateStore at first not to throw // ApplicationDoesNotExistInCacheException before and after // RM work-preserving restart. @@ -576,6 +578,10 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse.setAvailableResources(allocation.getResourceLimit()); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + + // add aggregator address for this application + allocateResponse.setAggregatorAddr( + this.rmContext.getRMApps().get(applicationId).getAggregatorAddr()); // add preemption to the allocateResponse message (if any) allocateResponse diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 0de556b..f163a28 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -21,6 +21,9 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; @@ -406,6 +410,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); return resync; } + + // Check & update aggregators info from request. + // TODO make sure it won't have race condition issue for AM failed over case + // that the older registration could possible override the newer one. + updateAppAggregatorsMap(request); // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils @@ -421,15 +430,72 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) if (!systemCredentials.isEmpty()) { nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } + + // Return aggregators' map that NM needs to know + // TODO we should optimize this to only include aggreator info that NM + // doesn't know yet. + List keepAliveApps = remoteNodeStatus.getKeepAliveApplications(); + if (keepAliveApps != null) { + setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse); + } // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + keepAliveApps, nodeHeartBeatResponse)); return nodeHeartBeatResponse; } + + private void setAppAggregatorsMapToResponse( + List liveApps, NodeHeartbeatResponse response) { + Map liveAppAggregatorsMap = new + ConcurrentHashMap(); + Map rmApps = rmContext.getRMApps(); + for (ApplicationId appId : liveApps) { + String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr(); + if (appAggregatorAddr != null) { + liveAppAggregatorsMap.put(appId, appAggregatorAddr); + } else { + // Log a debug info if aggregator address is not found. + if (LOG.isDebugEnabled()) { + LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!"); + } + } + } + response.setAppAggregatorsMap(liveAppAggregatorsMap); + } + + private void updateAppAggregatorsMap(NodeHeartbeatRequest request) { + Map registeredAggregatorsMap = + request.getRegisteredAggregators(); + if (registeredAggregatorsMap != null + && !registeredAggregatorsMap.isEmpty()) { + Map rmApps = rmContext.getRMApps(); + for (Map.Entry entry: + registeredAggregatorsMap.entrySet()) { + ApplicationId appId = entry.getKey(); + String aggregatorAddr = entry.getValue(); + if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) { + RMApp rmApp = rmApps.get(appId); + if (rmApp == null) { + LOG.warn("Cannot update aggregator info because application ID: " + + appId + " is not found in RMContext!"); + } else { + String previousAggregatorAddr = rmApp.getAggregatorAddr(); + if (previousAggregatorAddr == null || + previousAggregatorAddr != aggregatorAddr) { + // sending aggregator update event. + RMAppAggregatorUpdateEvent event = + new RMAppAggregatorUpdateEvent(appId, aggregatorAddr); + rmContext.getDispatcher().getEventHandler().handle(event); + } + } + } + } + } + } private void populateKeys(NodeHeartbeatRequest request, NodeHeartbeatResponse nodeHeartBeatResponse) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fbcaab9..f81edb2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -172,6 +172,23 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the tracking url for the application master. */ String getTrackingUrl(); + + /** + * The aggregator address for the application. + * @return the address for the application's aggregator. + */ + String getAggregatorAddr(); + + /** + * Set aggregator address for the application + * @param aggregatorAddr the address of aggregator + */ + void setAggregatorAddr(String aggregatorAddr); + + /** + * Remove aggregator address when application is finished or killed. + */ + void removeAggregatorAddr(); /** * The original tracking url for the application master. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java new file mode 100644 index 0000000..b43de44 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class RMAppAggregatorUpdateEvent extends RMAppEvent { + + private final String appAggregatorAddr; + + public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) { + super(appId, RMAppEventType.AGGREGATOR_UPDATE); + this.appAggregatorAddr = appAggregatorAddr; + } + + public String getAppAggregatorAddr(){ + return this.appAggregatorAddr; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 668c5e1..6e9460a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -30,6 +30,9 @@ // Source: Scheduler APP_ACCEPTED, + + // TODO add source later + AGGREGATOR_UPDATE, // Source: RMAppAttempt ATTEMPT_REGISTERED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2d1737a..6a076ac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -134,6 +134,7 @@ private long startTime; private long finishTime = 0; private long storedFinishTime = 0; + private String aggregatorAddr; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; private String queue; @@ -165,6 +166,8 @@ // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, @@ -181,6 +184,8 @@ // Transitions from NEW_SAVING state .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, @@ -199,6 +204,8 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition( @@ -215,6 +222,8 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED) .addTransition(RMAppState.ACCEPTED, @@ -241,6 +250,8 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_UNREGISTERED, new FinalSavingTransition( @@ -270,6 +281,8 @@ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, @@ -281,6 +294,8 @@ .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, @@ -292,6 +307,8 @@ .addTransition(RMAppState.KILLING, RMAppState.KILLING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -488,6 +505,21 @@ public String getQueue() { public void setQueue(String queue) { this.queue = queue; } + + @Override + public String getAggregatorAddr() { + return this.aggregatorAddr; + } + + @Override + public void setAggregatorAddr(String aggregatorAddr) { + this.aggregatorAddr = aggregatorAddr; + } + + @Override + public void removeAggregatorAddr() { + this.aggregatorAddr = null; + } @Override public String getName() { @@ -737,6 +769,8 @@ public void recover(RMState state) { this.diagnostics.append(appState.getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); + //TODO recover aggregator address. + //this.aggregatorAddr = appState.getAggregatorAddr(); for(int i=0; i { public void transition(RMAppImpl app, RMAppEvent event) { }; - } + private static final class RMAppAggregatorUpdateTransition + extends RMAppTransition { + + public void transition(RMAppImpl app, RMAppEvent event) { + LOG.info("Updating aggregator info for app: " + app.getApplicationId()); + + RMAppAggregatorUpdateEvent appAggregatorUpdateEvent = + (RMAppAggregatorUpdateEvent) event; + // Update aggregator address + app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr()); + + // TODO persistent to RMStateStore for recover + // Save to RMStateStore + }; + } + private static final class RMAppNodeUpdateTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index f8d92aa..0d0895a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -92,6 +92,18 @@ public StringBuilder getDiagnostics() { throw new UnsupportedOperationException("Not supported yet."); } @Override + public String getAggregatorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override + public void setAggregatorAddr(String aggregatorAddr) { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override + public void removeAggregatorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public ApplicationId getApplicationId() { throw new UnsupportedOperationException("Not supported yet."); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index ec990f9..96952d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -271,4 +271,19 @@ public ReservationId getReservationId() { public ResourceRequest getAMResourceRequest() { return this.amReq; } + + @Override + public String getAggregatorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void removeAggregatorAddr() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void setAggregatorAddr(String aggregatorAddr) { + throw new UnsupportedOperationException("Not supported yet."); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java index cdc4e35..19920fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java @@ -94,10 +94,9 @@ protected void serviceStop() throws Exception { * @return whether it was added successfully */ public boolean addApplication(ApplicationId appId) { - String appIdString = appId.toString(); AppLevelTimelineAggregator aggregator = - new AppLevelTimelineAggregator(appIdString); - return (aggregatorCollection.putIfAbsent(appIdString, aggregator) + new AppLevelTimelineAggregator(appId.toString()); + return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java index 73b6d52..d6e2a18 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator; +import java.io.IOException; import java.net.URI; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -30,9 +32,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -62,6 +70,12 @@ // REST server for this aggregator collection private HttpServer2 timelineRestServer; + + private String timelineRestServerBindAddress; + + private AggregatorNodemanagerProtocol nmAggregatorService; + + private InetSocketAddress nmAggregatorServiceAddress; static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection"; @@ -74,6 +88,16 @@ static TimelineAggregatorsCollection getInstance() { } @Override + public void serviceInit(Configuration conf) throws Exception { + this.nmAggregatorServiceAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT); + + } + + @Override protected void serviceStart() throws Exception { startWebApp(); super.serviceStart(); @@ -95,9 +119,13 @@ protected void serviceStop() throws Exception { * starting the app level service * @return the aggregator associated with id after the potential put. */ - public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) { + public TimelineAggregator putIfAbsent(ApplicationId appId, + TimelineAggregator aggregator) { + String id = appId.toString(); + TimelineAggregator aggregatorInTable; + boolean aggregatorIsNew = false; synchronized (aggregators) { - TimelineAggregator aggregatorInTable = aggregators.get(id); + aggregatorInTable = aggregators.get(id); if (aggregatorInTable == null) { try { // initialize, start, and add it to the collection so it can be @@ -106,16 +134,30 @@ public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) aggregator.start(); aggregators.put(id, aggregator); LOG.info("the aggregator for " + id + " was added"); - return aggregator; + aggregatorInTable = aggregator; + aggregatorIsNew = true; } catch (Exception e) { throw new YarnRuntimeException(e); } } else { String msg = "the aggregator for " + id + " already exists!"; LOG.error(msg); - return aggregatorInTable; + } + + } + // Report to NM if a new aggregator is added. + if (aggregatorIsNew) { + try { + reportNewAggregatorToNM(appId); + } catch (Exception e) { + // throw exception here as it cannot be used if failed report to NM + LOG.error("Failed to report a new aggregator for application: " + appId + + " to NM Aggregator Services."); + throw new YarnRuntimeException(e); } } + + return aggregatorInTable; } /** @@ -167,7 +209,10 @@ private void startWebApp() { String bindAddress = WebAppUtils.getWebAppBindURL(conf, YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); - LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress); + this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress( + NetUtils.createSocketAddr(bindAddress)); + LOG.info("Instantiating the per-node aggregator webapp at " + + timelineRestServerBindAddress); try { Configuration confForInfoServer = new Configuration(conf); confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); @@ -200,4 +245,27 @@ private void startWebApp() { throw new YarnRuntimeException(msg, e); } } + + private void reportNewAggregatorToNM(ApplicationId appId) + throws YarnException, IOException { + this.nmAggregatorService = getNMAggregatorService(); + ReportNewAggregatorsInfoRequest request = + ReportNewAggregatorsInfoRequest.newInstance(appId, + this.timelineRestServerBindAddress); + LOG.info("Report a new aggregator for application: " + appId + + " to NM Aggregator Services."); + nmAggregatorService.reportNewAggregatorInfo(request); + } + + // protected for test + protected AggregatorNodemanagerProtocol getNMAggregatorService(){ + Configuration conf = getConfig(); + final YarnRPC rpc = YarnRPC.create(conf); + + // TODO Security settings. + return (AggregatorNodemanagerProtocol) rpc.getProxy( + AggregatorNodemanagerProtocol.class, + nmAggregatorServiceAddress, conf); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java index cec1d71..dd64629 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java @@ -32,6 +32,7 @@ import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.junit.Test; public class TestTimelineAggregatorsCollection { @@ -45,11 +46,11 @@ public void testMultithreadedAdd() throws Exception { final int NUM_APPS = 5; List> tasks = new ArrayList>(); for (int i = 0; i < NUM_APPS; i++) { - final String appId = String.valueOf(i); + final ApplicationId appId = ApplicationId.newInstance(0L, i); Callable task = new Callable() { public Boolean call() { AppLevelTimelineAggregator aggregator = - new AppLevelTimelineAggregator(appId); + new AppLevelTimelineAggregator(appId.toString()); return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator); } }; @@ -79,14 +80,14 @@ public void testMultithreadedAddAndRemove() throws Exception { final int NUM_APPS = 5; List> tasks = new ArrayList>(); for (int i = 0; i < NUM_APPS; i++) { - final String appId = String.valueOf(i); + final ApplicationId appId = ApplicationId.newInstance(0L, i); Callable task = new Callable() { public Boolean call() { AppLevelTimelineAggregator aggregator = - new AppLevelTimelineAggregator(appId); + new AppLevelTimelineAggregator(appId.toString()); boolean successPut = (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator); - return successPut && aggregatorCollection.remove(appId); + return successPut && aggregatorCollection.remove(appId.toString()); } }; tasks.add(task);