diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 9232fc81f66..f5162e9102f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -25,8 +25,10 @@ import java.util.Map; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.UriBuilder; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -48,10 +50,8 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ServiceStatus; import org.apache.hadoop.yarn.service.conf.RestApiConstants; -import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.util.RMHAUtils; -import org.codehaus.jackson.map.PropertyNamingStrategy; import org.eclipse.jetty.util.UrlEncoded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,11 +147,7 @@ private String getServicePath(String appName) throws IOException { api.append("/"); api.append(appName); } - Configuration conf = getConfig(); - if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase("simple")) { - api.append("?user.name=" + UrlEncoded - .encodeString(System.getProperty("user.name"))); - } + appendUserNameIfRequired(api); return api.toString(); } @@ -162,15 +158,27 @@ private String getInstancesPath(String appName) throws IOException { api.append(url); api.append("/app/v1/services/").append(appName).append("/") .append(RestApiConstants.COMP_INSTANCES); - Configuration conf = getConfig(); - if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( - "simple")) { - api.append("?user.name=" + UrlEncoded - .encodeString(System.getProperty("user.name"))); - } + appendUserNameIfRequired(api); return api.toString(); } + private String getInstancePath(String appName, List components, + String version, List containerStates) throws IOException { + UriBuilder builder = UriBuilder.fromUri(getInstancesPath(appName)); + if (components != null && !components.isEmpty()) { + components.forEach(compName -> + builder.queryParam(RestApiConstants.PARAM_COMP_NAME, compName)); + } + if (!Strings.isNullOrEmpty(version)){ + builder.queryParam(RestApiConstants.PARAM_VERSION, version); + } + if (containerStates != null && !containerStates.isEmpty()){ + containerStates.forEach(state -> + builder.queryParam(RestApiConstants.PARAM_CONTAINER_STATE, state)); + } + return builder.build().toString(); + } + private String getComponentsPath(String appName) throws IOException { Preconditions.checkNotNull(appName); String url = getRMWebAddress(); @@ -178,13 +186,17 @@ private String getComponentsPath(String appName) throws IOException { api.append(url); api.append("/app/v1/services/").append(appName).append("/") .append(RestApiConstants.COMPONENTS); + appendUserNameIfRequired(api); + return api.toString(); + } + + private void appendUserNameIfRequired(StringBuilder builder) { Configuration conf = getConfig(); if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase( "simple")) { - api.append("?user.name=" + UrlEncoded + builder.append("?user.name=").append(UrlEncoded .encodeString(System.getProperty("user.name"))); } - return api.toString(); } private Builder getApiClient() throws IOException { @@ -553,7 +565,7 @@ public int actionUpgradeInstances(String appName, List compInstances) container.setState(ContainerState.UPGRADING); toUpgrade[idx++] = container; } - String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade); + String buffer = ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(toUpgrade); ClientResponse response = getApiClient(getInstancesPath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); @@ -577,7 +589,7 @@ public int actionUpgradeComponents(String appName, List components) component.setState(ComponentState.UPGRADING); toUpgrade[idx++] = component; } - String buffer = COMP_JSON_SERDE.toJson(toUpgrade); + String buffer = ServiceApiUtil.COMP_JSON_SERDE.toJson(toUpgrade); ClientResponse response = getApiClient(getComponentsPath(appName)) .put(ClientResponse.class, buffer); result = processResponse(response); @@ -599,11 +611,25 @@ public int actionCleanUp(String appName, String userName) throws return result; } - private static final JsonSerDeser CONTAINER_JSON_SERDE = - new JsonSerDeser<>(Container[].class, - PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); - - private static final JsonSerDeser COMP_JSON_SERDE = - new JsonSerDeser<>(Component[].class, - PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + @Override + public String getInstances(String appName, List components, + String version, List containerStates) throws IOException, + YarnException { + try { + String uri = getInstancePath(appName, components, version, + containerStates); + ClientResponse response = getApiClient(uri).get(ClientResponse.class); + if (response.getStatus() != 200) { + StringBuilder sb = new StringBuilder(); + sb.append("Failed: HTTP error code: "); + sb.append(response.getStatus()); + sb.append(" ErrorMsg: ").append(response.getEntity(String.class)); + return sb.toString(); + } + return response.getEntity(String.class); + } catch (Exception e) { + LOG.error("Fail to get containers {}", e); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index 82fadae8bc3..4db0ac8f409 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -44,14 +44,7 @@ import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -61,13 +54,7 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; @@ -582,6 +569,40 @@ public Response updateComponentInstances(@Context HttpServletRequest request, return Response.status(Status.NO_CONTENT).build(); } + @GET + @Path(COMP_INSTANCES_PATH) + @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8}) + public Response getComponentInstances(@Context HttpServletRequest request, + @PathParam(SERVICE_NAME) String serviceName, + @QueryParam(PARAM_COMP_NAME) List componentNames, + @QueryParam(PARAM_VERSION) String version, + @QueryParam(PARAM_CONTAINER_STATE) List containerStates) { + try { + UserGroupInformation ugi = getProxyUser(request); + LOG.info("GET: component instances for service = {}, compNames in {}, " + + "version = {}, containerStates in {}, user = {}", serviceName, + Objects.toString(componentNames, "[]"), Objects.toString(version, ""), + Objects.toString(containerStates, "[]"), ugi); + + List containerStatesDe = containerStates.stream().map( + ContainerState::valueOf).collect(Collectors.toList()); + + return Response.ok(getContainers(ugi, serviceName, componentNames, + version, containerStatesDe)).build(); + } catch (IllegalArgumentException iae) { + return formatResponse(Status.BAD_REQUEST, "valid container states are: " + + Arrays.toString(ContainerState.values())); + } catch (AccessControlException e) { + return formatResponse(Response.Status.FORBIDDEN, e.getMessage()); + } catch (IOException | InterruptedException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getMessage()); + } catch (UndeclaredThrowableException e) { + return formatResponse(Response.Status.INTERNAL_SERVER_ERROR, + e.getCause().getMessage()); + } + } + private Response flexService(Service service, UserGroupInformation ugi) throws IOException, InterruptedException { String appName = service.getName(); @@ -752,6 +773,22 @@ private Service getServiceFromClient(UserGroupInformation ugi, }); } + private Container[] getContainers(UserGroupInformation ugi, + String serviceName, List componentNames, String version, + List containerStates) throws IOException, + InterruptedException { + return ugi.doAs((PrivilegedExceptionAction) () -> { + Container[] result; + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + result = sc.getContainers(serviceName, componentNames, version, + containerStates); + sc.close(); + return result; + }); + } + /** * Used by negative test case. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 45ff98ac57d..652a314abef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; @@ -55,4 +57,7 @@ RestartServiceResponseProto restart(RestartServiceRequestProto request) CompInstancesUpgradeResponseProto upgrade( CompInstancesUpgradeRequestProto request) throws IOException, YarnException; + + GetCompInstancesResponseProto getCompInstances( + GetCompInstancesRequestProto request) throws IOException, YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index e97c3d64ab6..5bf183319fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; @@ -43,15 +45,18 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; +import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; +import org.apache.hadoop.yarn.service.utils.FilterUtils; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX; @@ -194,4 +199,13 @@ public CompInstancesUpgradeResponseProto upgrade( } return CompInstancesUpgradeResponseProto.newBuilder().build(); } + + @Override + public GetCompInstancesResponseProto getCompInstances( + GetCompInstancesRequestProto request) throws IOException { + List containers = FilterUtils.filterInstances(context, request); + return GetCompInstancesResponseProto.newBuilder().setCompInstances( + ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray( + new Container[containers.size()]))).build(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 699a4e508cb..4b67998ccca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; @@ -66,6 +68,7 @@ import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; @@ -100,6 +103,7 @@ import java.text.MessageFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import static org.apache.hadoop.yarn.api.records.YarnApplicationState.*; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*; @@ -318,6 +322,49 @@ public int actionCleanUp(String appName, String userName) throws } } + @Override + public String getInstances(String appName, + List components, String version, List containerStates) + throws IOException, YarnException { + GetCompInstancesResponseProto result = filterContainers(appName, components, + version, containerStates); + return result.getCompInstances(); + } + + public Container[] getContainers(String appName, List components, + String version, List containerStates) + throws IOException, YarnException { + GetCompInstancesResponseProto result = filterContainers(appName, components, + version, containerStates != null ? containerStates.stream() + .map(Enum::toString).collect(Collectors.toList()) : null); + + return ServiceApiUtil.CONTAINER_JSON_SERDE.fromJson( + result.getCompInstances()); + } + + private GetCompInstancesResponseProto filterContainers(String appName, + List components, String version, + List containerStates) throws IOException, YarnException { + ApplicationReport appReport = yarnClient.getApplicationReport(getAppId( + appName)); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(appName + " AM hostname is empty."); + } + ClientAMProtocol proxy = createAMProxy(appName, appReport); + GetCompInstancesRequestProto.Builder req = GetCompInstancesRequestProto + .newBuilder(); + if (components != null && !components.isEmpty()) { + req.addAllComponentNames(components); + } + if (version != null) { + req.setVersion(version); + } + if (containerStates != null && !containerStates.isEmpty()){ + req.addAllContainerStates(containerStates); + } + return proxy.getCompInstances(req.build()); + } + public int actionUpgrade(Service service, List compInstances) throws IOException, YarnException { ApplicationReport appReport = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 529596d989e..64f35d37cba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -97,6 +97,7 @@ private long containerStartedTime = 0; // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; + private String serviceVersion; private static final StateMachineFactory filterInstances(ServiceContext context, + ClientAMProtocol.GetCompInstancesRequestProto filterReq) { + List results = new ArrayList<>(); + Map instances = + context.scheduler.getLiveInstances(); + + instances.forEach(((containerId, instance) -> { + boolean include = true; + if (filterReq.getComponentNamesList() != null && + !filterReq.getComponentNamesList().isEmpty()) { + // filter by component name + if (!filterReq.getComponentNamesList().contains( + instance.getComponent().getName())) { + include = false; + } + } + + if (filterReq.getVersion() != null && !filterReq.getVersion().isEmpty()) { + // filter by version + String instanceServiceVersion = instance.getServiceVersion(); + if (instanceServiceVersion == null || !instanceServiceVersion.equals( + filterReq.getVersion())) { + include = false; + } + } + + if (filterReq.getContainerStatesList() != null && + !filterReq.getContainerStatesList().isEmpty()) { + // filter by state + if (!filterReq.getContainerStatesList().contains( + instance.getContainerState().toString())) { + include = false; + } + } + + if (include) { + results.add(instance.getContainerSpec()); + } + })); + + return results; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 705e04065c0..447250f9cda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -72,6 +72,15 @@ public static JsonSerDeser jsonSerDeser = new JsonSerDeser<>(Service.class, PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + + public static final JsonSerDeser CONTAINER_JSON_SERDE = + new JsonSerDeser<>(Container[].class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + + public static final JsonSerDeser COMP_JSON_SERDE = + new JsonSerDeser<>(Component[].class, + PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); + private static final PatternValidator namePattern = new PatternValidator("[a-z][a-z0-9-]*"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 91721b0d900..6166dedd1de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -32,6 +32,8 @@ service ClientAMProtocolService { returns (RestartServiceResponseProto); rpc upgrade(CompInstancesUpgradeRequestProto) returns (CompInstancesUpgradeResponseProto); + rpc getCompInstances(GetCompInstancesRequestProto) returns + (GetCompInstancesResponseProto); } message FlexComponentsRequestProto { @@ -81,4 +83,14 @@ message CompInstancesUpgradeRequestProto { } message CompInstancesUpgradeResponseProto { +} + +message GetCompInstancesRequestProto { + repeated string componentNames = 1; + optional string version = 2; + repeated string containerStates = 3; +} + +message GetCompInstancesResponseProto { + optional string compInstances = 1; } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java new file mode 100644 index 00000000000..89888c5cf97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.component.Component; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; +import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Mocked service context for a running service. + */ +public class MockRunningServiceContext extends ServiceContext { + + public MockRunningServiceContext(ServiceTestUtils.ServiceFSWatcher fsWatcher, + Service serviceDef) throws Exception { + super(); + this.service = serviceDef; + this.fs = fsWatcher.getFs(); + + ContainerLaunchService mockLaunchService = mock( + ContainerLaunchService.class); + + this.scheduler = new ServiceScheduler(this) { + @Override + protected YarnRegistryViewForProviders + createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + + @Override + public NMClientAsync createNMClient() { + NMClientAsync nmClientAsync = super.createNMClient(); + NMClient nmClient = mock(NMClient.class); + try { + when(nmClient.getContainerStatus(anyObject(), anyObject())) + .thenAnswer( + (Answer) invocation -> ContainerStatus + .newInstance((ContainerId) invocation.getArguments()[0], + org.apache.hadoop.yarn.api.records.ContainerState + .RUNNING, + "", 0)); + } catch (YarnException | IOException e) { + throw new RuntimeException(e); + } + nmClientAsync.setClient(nmClient); + return nmClientAsync; + } + + @Override + public ContainerLaunchService getContainerLaunchService() { + return mockLaunchService; + } + }; + this.scheduler.init(fsWatcher.getConf()); + + ServiceTestUtils.createServiceManager(this); + + doNothing().when(mockLaunchService). + reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); + stabilizeComponents(this); + } + + private void stabilizeComponents(ServiceContext context) { + + ApplicationId appId = ApplicationId.fromString(context.service.getId()); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + context.attemptId = attemptId; + Map + componentState = context.scheduler.getAllComponents(); + + int counter = 0; + for (org.apache.hadoop.yarn.service.api.records.Component componentSpec : + context.service.getComponents()) { + Component component = new org.apache.hadoop.yarn.service.component. + Component(componentSpec, 1L, context); + componentState.put(component.getName(), component); + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.FLEX)); + + for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { + counter++; + assignNewContainer(attemptId, counter, component); + } + + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.CHECK_STABLE)); + } + } + + public void assignNewContainer(ApplicationAttemptId attemptId, + long containerNum, Component component) { + + Container container = org.apache.hadoop.yarn.api.records.Container + .newInstance(ContainerId.newContainerId(attemptId, containerNum), + NODE_ID, "localhost", null, null, + null); + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.CONTAINER_ALLOCATED) + .setContainer(container).setContainerId(container.getId())); + ComponentInstance instance = this.scheduler.getLiveInstances().get( + container.getId()); + ComponentInstanceEvent startEvent = new ComponentInstanceEvent( + container.getId(), ComponentInstanceEventType.START); + instance.handle(startEvent); + + ComponentInstanceEvent readyEvent = new ComponentInstanceEvent( + container.getId(), ComponentInstanceEventType.BECOME_READY); + instance.handle(readyEvent); + } + + private static final NodeId NODE_ID = NodeId.fromString("localhost:0"); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java index 363fe91373f..0e047c20b25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java @@ -166,7 +166,7 @@ public void testFlexComponents() throws Throwable { checkApp(serviceName, "master", 1L, 1000L, "qname"); } - @Test (timeout = 180000) + @Test public void testInitiateServiceUpgrade() throws Exception { String[] args = {"app", "-upgrade", "app-1", "-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON), @@ -185,7 +185,7 @@ public void testInitiateAutoFinalizeServiceUpgrade() throws Exception { Assert.assertEquals(result, 0); } - @Test (timeout = 180000) + @Test public void testUpgradeInstances() throws Exception { conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, DummyServiceClient.class.getName()); @@ -197,7 +197,7 @@ public void testUpgradeInstances() throws Exception { Assert.assertEquals(result, 0); } - @Test (timeout = 180000) + @Test public void testUpgradeComponents() throws Exception { conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, DummyServiceClient.class.getName()); @@ -209,6 +209,18 @@ public void testUpgradeComponents() throws Exception { Assert.assertEquals(result, 0); } + @Test + public void testGetInstances() throws Exception { + conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, + DummyServiceClient.class.getName()); + cli.setConf(conf); + String[] args = {"container", "-list", "app-1", + "-components", "comp1,comp2", + "-appTypes", DUMMY_APP_TYPE}; + int result = cli.run(ApplicationCLI.preProcessArgs(args)); + Assert.assertEquals(result, 0); + } + @Test (timeout = 180000) public void testEnableFastLaunch() throws Exception { fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar")) @@ -313,5 +325,12 @@ public int actionUpgradeComponents(String appName, List components) throws IOException, YarnException { return 0; } + + @Override + public String getInstances(String appName, List components, + String version, List containerStates) + throws IOException, YarnException { + return ""; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java index d3664ea1dc3..700655ce5de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.client; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -32,8 +33,12 @@ import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; +import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Container; @@ -41,6 +46,7 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.exceptions.ErrorStrings; +import org.apache.hadoop.yarn.service.utils.FilterUtils; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.junit.Assert; import org.junit.Rule; @@ -52,6 +58,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.List; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -122,6 +129,26 @@ public void testActionCompInstanceUpgrade() throws Exception { client.stop(); } + @Test + public void testGetCompInstances() throws Exception { + Service service = createService(); + MockServiceClient client = MockServiceClient.create(rule, service, true); + + //upgrade the service + service.setVersion("v2"); + client.initiateUpgrade(service); + + //add containers to the component that needs to be upgraded. + Component comp = service.getComponents().iterator().next(); + ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L); + comp.addContainer(new Container().id(containerId.toString())); + + Container[] containers = client.getContainers(service.getName(), + Lists.newArrayList("compa"), "v1", null); + Assert.assertEquals("num containers", 2, containers.length); + client.stop(); + } + private Service createService() throws IOException, YarnException { Service service = ServiceTestUtils.createExampleApplication(); @@ -137,6 +164,7 @@ private Service createService() throws IOException, private final ClientAMProtocol amProxy; private Object proxyResponse; private Service service; + private ServiceContext context; private MockServiceClient() { amProxy = mock(ClientAMProtocol.class); @@ -147,8 +175,12 @@ private MockServiceClient() { static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule, Service service, boolean enableUpgrade) - throws IOException, YarnException { + throws Exception { MockServiceClient client = new MockServiceClient(); + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + service.setId(applicationId.toString()); + client.context = new MockRunningServiceContext(rule, service); YarnClient yarnClient = createMockYarnClient(); ApplicationReport appReport = mock(ApplicationReport.class); @@ -175,10 +207,28 @@ static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule, CompInstancesUpgradeRequestProto.class))).thenAnswer( (Answer) invocation -> { CompInstancesUpgradeResponseProto response = - CompInstancesUpgradeResponseProto.newBuilder().build(); + CompInstancesUpgradeResponseProto.newBuilder().build(); client.proxyResponse = response; return response; }); + + when(client.amProxy.getCompInstances(Matchers.any( + GetCompInstancesRequestProto.class))).thenAnswer( + (Answer) invocation -> { + + GetCompInstancesRequestProto req = (GetCompInstancesRequestProto) + invocation.getArguments()[0]; + List containers = FilterUtils.filterInstances( + client.context, req); + GetCompInstancesResponseProto response = + GetCompInstancesResponseProto.newBuilder().setCompInstances( + ServiceApiUtil.CONTAINER_JSON_SERDE.toJson( + containers.toArray(new Container[containers.size()]))) + .build(); + client.proxyResponse = response; + return response; + }); + client.setFileSystem(rule.getFs()); client.setYarnClient(yarnClient); client.service = service; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index d7c15ec731e..d5fb941d004 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -18,19 +18,10 @@ package org.apache.hadoop.yarn.service.component; -import org.apache.hadoop.registry.client.api.RegistryOperations; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.client.api.NMClient; -import org.apache.hadoop.yarn.client.api.async.NMClientAsync; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.ServiceContext; -import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.TestServiceManager; import org.apache.hadoop.yarn.service.api.records.ComponentState; @@ -38,23 +29,15 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; - -import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; -import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; -import org.mockito.stubbing.Answer; -import java.io.IOException; import java.util.Iterator; -import java.util.Map; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; - -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -63,7 +46,6 @@ */ public class TestComponent { - private static final int WAIT_MS_PER_LOOP = 1000; static final Logger LOG = Logger.getLogger(TestComponent.class); @Rule @@ -115,7 +97,7 @@ public void testCheckState() throws Exception { @Test public void testContainerCompletedWhenUpgrading() throws Exception { String serviceName = "testContainerComplete"; - ServiceContext context = createTestContext(rule, serviceName); + MockRunningServiceContext context = createTestContext(rule, serviceName); Component comp = context.scheduler.getAllComponents().entrySet().iterator() .next().getValue(); @@ -148,7 +130,7 @@ public void testContainerCompletedWhenUpgrading() throws Exception { ComponentState.FLEXING, comp.getComponentSpec().getState()); // new container get allocated - assignNewContainer(context.attemptId, 10, context, comp); + context.assignNewContainer(context.attemptId, 10, comp); // second instance finished upgrading ComponentInstance instance2 = instanceIter.next(); @@ -174,7 +156,7 @@ public void testComponentStateUpdatesWithTerminatingComponents() throws serviceName); TestServiceManager.createDef(serviceName, testService); - ServiceContext context = createTestContext(rule, testService); + ServiceContext context = new MockRunningServiceContext(rule, testService); for (Component comp : context.scheduler.getAllComponents().values()) { @@ -225,114 +207,11 @@ public void testComponentStateUpdatesWithTerminatingComponents() throws return spec; } - public static ServiceContext createTestContext( + public static MockRunningServiceContext createTestContext( ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName) throws Exception { - return createTestContext(fsWatcher, + return new MockRunningServiceContext(fsWatcher, TestServiceManager.createBaseDef(serviceName)); } - - public static ServiceContext createTestContext( - ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef) - throws Exception { - ServiceContext context = new ServiceContext(); - context.service = serviceDef; - context.fs = fsWatcher.getFs(); - - ContainerLaunchService mockLaunchService = mock( - ContainerLaunchService.class); - - context.scheduler = new ServiceScheduler(context) { - @Override protected YarnRegistryViewForProviders - createYarnRegistryOperations( - ServiceContext context, RegistryOperations registryClient) { - return mock(YarnRegistryViewForProviders.class); - } - - @Override public NMClientAsync createNMClient() { - NMClientAsync nmClientAsync = super.createNMClient(); - NMClient nmClient = mock(NMClient.class); - try { - when(nmClient.getContainerStatus(anyObject(), anyObject())) - .thenAnswer( - (Answer) invocation -> ContainerStatus - .newInstance((ContainerId) invocation.getArguments()[0], - org.apache.hadoop.yarn.api.records.ContainerState - .RUNNING, - "", 0)); - } catch (YarnException | IOException e) { - throw new RuntimeException(e); - } - nmClientAsync.setClient(nmClient); - return nmClientAsync; - } - - @Override public ContainerLaunchService getContainerLaunchService() { - return mockLaunchService; - } - }; - context.scheduler.init(fsWatcher.getConf()); - - ServiceTestUtils.createServiceManager(context); - - doNothing().when(mockLaunchService). - reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); - stabilizeComponents(context); - - return context; - } - - private static void stabilizeComponents(ServiceContext context) { - - ApplicationId appId = ApplicationId.fromString(context.service.getId()); - ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); - context.attemptId = attemptId; - Map - componentState = context.scheduler.getAllComponents(); - - int counter = 0; - for (org.apache.hadoop.yarn.service.api.records.Component componentSpec : - context.service.getComponents()) { - Component component = new org.apache.hadoop.yarn.service.component. - Component(componentSpec, 1L, context); - componentState.put(component.getName(), component); - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.FLEX)); - - for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { - counter++; - assignNewContainer(attemptId, counter, context, component); - } - - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.CHECK_STABLE)); - } - } - - private static void assignNewContainer( - ApplicationAttemptId attemptId, long containerNum, - ServiceContext context, Component component) { - - - Container container = org.apache.hadoop.yarn.api.records.Container - .newInstance(ContainerId.newContainerId(attemptId, containerNum), - NODE_ID, "localhost", null, null, - null); - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.CONTAINER_ALLOCATED) - .setContainer(container).setContainerId(container.getId())); - ComponentInstance instance = context.scheduler.getLiveInstances().get( - container.getId()); - ComponentInstanceEvent startEvent = new ComponentInstanceEvent( - container.getId(), ComponentInstanceEventType.START); - instance.handle(startEvent); - - ComponentInstanceEvent readyEvent = new ComponentInstanceEvent( - container.getId(), ComponentInstanceEventType.BECOME_READY); - instance.handle(readyEvent); - } - - private static final NodeId NODE_ID = NodeId.fromString("localhost:0"); - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index 26e8c931258..0e7816c876f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -60,19 +60,20 @@ */ public class TestComponentInstance { - @Rule public ServiceTestUtils.ServiceFSWatcher rule = + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); - @Test public void testContainerUpgrade() throws Exception { + @Test + public void testContainerUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testContainerUpgrade"); - Component component = - context.scheduler.getAllComponents().entrySet().iterator().next() - .getValue(); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); upgradeComponent(component); - ComponentInstance instance = - component.getAllComponentInstances().iterator().next(); + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); instance.handle(instanceEvent); @@ -82,16 +83,16 @@ containerSpec.getState()); } - @Test public void testContainerReadyAfterUpgrade() throws Exception { + @Test + public void testContainerReadyAfterUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testContainerStarted"); - Component component = - context.scheduler.getAllComponents().entrySet().iterator().next() - .getValue(); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); upgradeComponent(component); - ComponentInstance instance = - component.getAllComponentInstances().iterator().next(); + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); @@ -100,9 +101,8 @@ instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), ComponentInstanceEventType.BECOME_READY)); Assert.assertEquals("instance not ready", ContainerState.READY, - instance.getCompSpec() - .getContainer(instance.getContainer().getId().toString()) - .getState()); + instance.getCompSpec().getContainer( + instance.getContainer().getId().toString()).getState()); } private void upgradeComponent(Component component) { @@ -113,9 +113,8 @@ private void upgradeComponent(Component component) { private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum - restartPolicy, - int nSucceededInstances, int nFailedInstances, int totalAsk, - int componentId) { + restartPolicy, int nSucceededInstances, int nFailedInstances, + int totalAsk, int componentId) { assert (nSucceededInstances + nFailedInstances) <= totalAsk; @@ -214,7 +213,8 @@ private ComponentInstance createComponentInstance(Component component, return componentInstance; } - @Test public void testComponentRestartPolicy() { + @Test + public void testComponentRestartPolicy() { Map allComponents = new HashMap<>(); Service mockService = mock(Service.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestFilterUtils.java new file mode 100644 index 00000000000..065c37ad17a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/utils/TestFilterUtils.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.utils; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto; +import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.TestServiceManager; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; +import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; + +public class TestFilterUtils { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testNoFilter() throws Exception { + GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder() + .build(); + List containers = FilterUtils.filterInstances( + new MockRunningServiceContext(rule, + TestServiceManager.createBaseDef("service")), req); + Assert.assertEquals("num containers", 4, containers.size()); + } + + @Test + public void testFilterWithComp() throws Exception { + GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder() + .addAllComponentNames(Lists.newArrayList("compa")).build(); + List containers = FilterUtils.filterInstances( + new MockRunningServiceContext(rule, + TestServiceManager.createBaseDef("service")), req); + Assert.assertEquals("num containers", 2, containers.size()); + } + + @Test + public void testFilterWithVersion() throws Exception { + ServiceContext sc = new MockRunningServiceContext(rule, + TestServiceManager.createBaseDef("service")); + GetCompInstancesRequestProto.Builder reqBuilder = + GetCompInstancesRequestProto.newBuilder(); + + reqBuilder.setVersion("v2"); + Assert.assertEquals("num containers", 0, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + + reqBuilder.addAllComponentNames(Lists.newArrayList("compa")) + .setVersion("v1").build(); + + Assert.assertEquals("num containers", 2, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + + reqBuilder.setVersion("v2").build(); + Assert.assertEquals("num containers", 0, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + } + + @Test + public void testFilterWithState() throws Exception { + ServiceContext sc = new MockRunningServiceContext(rule, + TestServiceManager.createBaseDef("service")); + GetCompInstancesRequestProto.Builder reqBuilder = + GetCompInstancesRequestProto.newBuilder(); + + reqBuilder.addAllContainerStates(Lists.newArrayList( + ContainerState.READY.toString())); + Assert.assertEquals("num containers", 4, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + + reqBuilder.clearContainerStates(); + reqBuilder.addAllContainerStates(Lists.newArrayList( + ContainerState.STOPPED.toString())); + Assert.assertEquals("num containers", 0, + FilterUtils.filterInstances(sc, reqBuilder.build()).size()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 1d26a96bb11..e71bae2c8e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -105,6 +105,8 @@ public static final String UPGRADE_FINALIZE = "finalize"; public static final String COMPONENT_INSTS = "instances"; public static final String COMPONENTS = "components"; + public static final String VERSION = "version"; + public static final String STATES = "states"; private static String firstArg = null; @@ -294,10 +296,39 @@ public int run(String[] args) throws Exception { opts.addOption(STATUS_CMD, true, "Prints the status of the container."); opts.addOption(LIST_CMD, true, - "List containers for application attempt."); + "List containers for application attempt when application " + + "attempt ID is provided. When application name is provided, " + + "then it finds the instances of the application based on app's " + + "own implementation, and -appTypes option must be specified " + + "unless it is the default yarn-service type. With app name, it " + + "supports optional use of -version to filter instances based on " + + "app version, -components to filter instances based on component " + + "names, -states to filter instances based on instance state."); opts.addOption(HELP_CMD, false, "Displays help for all commands."); opts.getOption(STATUS_CMD).setArgName("Container ID"); - opts.getOption(LIST_CMD).setArgName("Application Attempt ID"); + opts.getOption(LIST_CMD).setArgName("Application Name or Attempt ID"); + opts.addOption(APP_TYPE_CMD, true, "Works with -list to " + + "specify the app type when application name is provided."); + opts.getOption(APP_TYPE_CMD).setValueSeparator(','); + opts.getOption(APP_TYPE_CMD).setArgs(Option.UNLIMITED_VALUES); + opts.getOption(APP_TYPE_CMD).setArgName("Types"); + + opts.addOption(VERSION, true, "Works with -list " + + "to filter instances based on input application version."); + opts.getOption(VERSION).setArgs(1); + + opts.addOption(COMPONENTS, true, "Works with -list to " + + "filter instances based on input comma-separated list of " + + "component names."); + opts.getOption(COMPONENTS).setValueSeparator(','); + opts.getOption(COMPONENTS).setArgs(Option.UNLIMITED_VALUES); + + opts.addOption(STATES, true, "Works with -list to " + + "filter instances based on input comma-separated list of " + + "instance states."); + opts.getOption(STATES).setValueSeparator(','); + opts.getOption(STATES).setArgs(Option.UNLIMITED_VALUES); + opts.addOption(SIGNAL_CMD, true, "Signal the container. The available signal commands are " + java.util.Arrays.asList(SignalContainerCommand.values()) + @@ -426,11 +457,41 @@ public int run(String[] args) throws Exception { } listApplicationAttempts(cliParser.getOptionValue(LIST_CMD)); } else if (title.equalsIgnoreCase(CONTAINER)) { - if (hasAnyOtherCLIOptions(cliParser, opts, LIST_CMD)) { + if (hasAnyOtherCLIOptions(cliParser, opts, LIST_CMD, APP_TYPE_CMD, + VERSION, COMPONENTS, STATES)) { printUsage(title, opts); return exitCode; } - listContainers(cliParser.getOptionValue(LIST_CMD)); + String appAttemptIdOrName = cliParser.getOptionValue(LIST_CMD); + try { + // try parsing attempt id, if it succeeds, it means it's appId + ApplicationAttemptId.fromString(appAttemptIdOrName); + listContainers(appAttemptIdOrName); + } catch (IllegalArgumentException e) { + // not appAttemptIf format, it could be appName. + // Print app specific report, if app-type is not provided, + // assume it is yarn-service type. + AppAdminClient client = AppAdminClient + .createAppAdminClient(getSingleAppTypeFromCLI(cliParser), + getConf()); + String version = cliParser.getOptionValue(VERSION); + String[] components = cliParser.getOptionValues(COMPONENTS); + String[] instanceStates = cliParser.getOptionValues(STATES); + try { + sysout.println(client.getInstances(appAttemptIdOrName, + components == null ? null : Arrays.asList(components), + version, instanceStates == null ? null : + Arrays.asList(instanceStates))); + return 0; + } catch (ApplicationNotFoundException exception) { + System.err.println("Application with name '" + appAttemptIdOrName + + "' doesn't exist in RM or Timeline Server."); + return -1; + } catch (Exception ex) { + System.err.println(ex.getMessage()); + return -1; + } + } } } else if (cliParser.hasOption(KILL_CMD)) { if (hasAnyOtherCLIOptions(cliParser, opts, KILL_CMD)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 518cd1cc4f4..6b823b229b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -2280,13 +2280,17 @@ private String createContainerCLIHelpMessage() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintWriter pw = new PrintWriter(baos); pw.println("usage: container"); + pw.println(" -appTypes Works with -list to specify the app type when application name is provided."); + pw.println(" -components Works with -list to filter instances based on input comma-separated list of component names."); pw.println(" -help Displays help for all commands."); - pw.println(" -list List containers for application attempt."); + pw.println(" -list List containers for application attempt when application attempt ID is provided. When application name is provided, then it finds the instances of the application based on app's own implementation, and -appTypes option must be specified unless it is the default yarn-service type. With app name, it supports optional use of -version to filter instances based on app version, -components to filter instances based on component names, -states to filter instances based on instance state."); pw.println(" -signal Signal the container."); pw.println("The available signal commands are "); pw.println(java.util.Arrays.asList(SignalContainerCommand.values())); pw.println(" Default command is OUTPUT_THREAD_DUMP."); + pw.println(" -states Works with -list to filter instances based on input comma-separated list of instance states."); pw.println(" -status Prints the status of the container."); + pw.println(" -version Works with -list to filter instances based on input application version. "); pw.close(); try { return normalize(baos.toString("UTF-8")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index 3cd1a787103..3fb4778327e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -282,4 +282,10 @@ public abstract int actionUpgradeComponents(String appName, public abstract int actionCleanUp(String appName, String userName) throws IOException, YarnException; + @Public + @Unstable + public abstract String getInstances(String appName, + List components, String version, List containerStates) + throws IOException, YarnException; + }