diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index cdef2e7..503675f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -761,6 +761,19 @@ public static final String AHS_STORAGE = YARN_PREFIX + "yarn.ahs.storage.class"; + /** host:port address for Application History Server API.*/ + public static final String AHS_HISTORY_PREFIX = "yarn.ahs."; + public static final String AHS_HISTORY_ADDRESS = + AHS_HISTORY_PREFIX + "address"; + public static final int DEFAULT_AHS_PORT = 10200; + public static final String DEFAULT_AHS_ADDRESS = + "0.0.0.0:" + DEFAULT_AHS_PORT; + + /** The number of threads to handle client API requests.*/ + public static final String AHS_CLIENT_THREAD_COUNT = + AHS_HISTORY_PREFIX + "client.thread-count"; + public static final int DEFAULT_AHS_CLIENT_THREAD_COUNT = 10; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/AHSClientService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/AHSClientService.java deleted file mode 100644 index cf6ccc5..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/AHSClientService.java +++ /dev/null @@ -1,115 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.applicationhistoryservice; - -import java.io.IOException; - -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; -import org.apache.hadoop.yarn.exceptions.YarnException; - -public class AHSClientService extends AbstractService implements - ApplicationHistoryProtocol { - - public AHSClientService() { - super(AHSClientService.class.getName()); - } - - @Override - public GetApplicationReportResponse getApplicationReport( - GetApplicationReportRequest request) throws YarnException { - // TODO Auto-generated method stub - return null; - } - - @Override - public GetApplicationsResponse - getApplications(GetApplicationsRequest request) throws YarnException { - // TODO Auto-generated method stub - return null; - } - - @Override - public GetDelegationTokenResponse getDelegationToken( - GetDelegationTokenRequest request) throws YarnException { - // TODO Auto-generated method stub - return null; - } - - @Override - public RenewDelegationTokenResponse renewDelegationToken( - RenewDelegationTokenRequest request) throws YarnException { - // TODO Auto-generated method stub - return null; - } - - @Override - public CancelDelegationTokenResponse cancelDelegationToken( - CancelDelegationTokenRequest request) throws YarnException { - // TODO Auto-generated method stub - return null; - } - - @Override - public GetApplicationAttemptReportResponse getApplicationAttemptReport( - GetApplicationAttemptReportRequest request) throws YarnException, - IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public GetApplicationAttemptsResponse getApplicationAttempts( - GetApplicationAttemptsRequest request) throws YarnException, IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public GetContainerReportResponse getContainer( - GetContainerReportRequest request) throws YarnException, IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public GetContainersResponse getContainers(GetContainersRequest request) - throws YarnException, IOException { - // TODO Auto-generated method stub - return null; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java new file mode 100644 index 0000000..d8c5997 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; + +public class ApplicationHistoryClientService extends AbstractService { + private static final Log LOG = + LogFactory.getLog(ApplicationHistoryClientService.class); + private ApplicationHistoryContext history; + private ApplicationHistoryProtocol protocolHandler; + private Server server; + private InetSocketAddress bindAddress; + + public ApplicationHistoryClientService(ApplicationHistoryContext history) { + super("ApplicationHistoryClientService"); + this.history = history; + this.protocolHandler = new ApplicationHSClientProtocolHandler(); + } + + protected void serviceStart() throws Exception { + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + InetSocketAddress address = + conf.getSocketAddr(YarnConfiguration.AHS_HISTORY_ADDRESS, + YarnConfiguration.DEFAULT_AHS_ADDRESS, + YarnConfiguration.DEFAULT_AHS_PORT); + + server = + rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler, + address, conf, null, conf.getInt( + YarnConfiguration.AHS_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_AHS_CLIENT_THREAD_COUNT)); + + server.start(); + this.bindAddress = + conf.updateConnectAddr(YarnConfiguration.AHS_HISTORY_ADDRESS, server + .getListenerAddress()); + LOG.info("Instantiated MRClientService at " + this.bindAddress); + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (server != null) { + server.stop(); + } + super.serviceStop(); + } + + @Private + public ApplicationHistoryProtocol getClientHandler() { + return this.protocolHandler; + } + + @Private + public InetSocketAddress getBindAddress() { + return this.bindAddress; + } + + private class ApplicationHSClientProtocolHandler implements + ApplicationHistoryProtocol { + + private RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + // TODO Auto-generated method stub + return null; + } + + public ApplicationAttemptReport getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + return history.getApplicationAttempt(appAttemptId); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) throws YarnException, + IOException { + GetApplicationAttemptReportResponse response = + recordFactory + .newRecordInstance(GetApplicationAttemptReportResponse.class); + response.setApplicationAttemptReport(getApplicationAttempt(request + .getApplicationAttemptId())); + return response; + } + + public List getApplicationAttempts( + ApplicationId appId) throws IOException { + List appAttemptReports = + new ArrayList(history + .getApplicationAttempts(appId).values()); + return appAttemptReports; + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, + IOException { + GetApplicationAttemptsResponse response = + recordFactory.newRecordInstance(GetApplicationAttemptsResponse.class); + response.setApplicationAttemptList(getApplicationAttempts(request + .getApplicationId())); + return response; + } + + public List getApplications(ApplicationId appId) + throws IOException { + List appReports; + if (appId == null) { + appReports = + new ArrayList(history.getAllApplications() + .values()); + } else { + appReports = new ArrayList(); + appReports.add(history.getApplication(appId)); + } + return appReports; + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + ApplicationId applicationId = request.getApplicationId(); + List listReports = getApplications(applicationId); + GetApplicationReportResponse response = + recordFactory.newRecordInstance(GetApplicationReportResponse.class); + if (listReports.size() >= 0) { + response.setApplicationReport(listReports.get(0)); + } else { + response.setApplicationReport(null); + } + return response; + } + + @Override + public GetApplicationsResponse getApplications( + GetApplicationsRequest request) throws YarnException, IOException { + ApplicationId applicationId = null; + List listReports = getApplications(applicationId); + GetApplicationsResponse response = + recordFactory.newRecordInstance(GetApplicationsResponse.class); + if (listReports.size() >= 0) { + response.setApplicationList(listReports); + } else { + response.setApplicationList(null); + } + return response; + } + + @Override + public GetContainerReportResponse getContainer( + GetContainerReportRequest request) throws YarnException, IOException { + GetContainerReportResponse response = + recordFactory.newRecordInstance(GetContainerReportResponse.class); + response.setContainerReport(history + .getContainer(request.getContainerId())); + return response; + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + GetContainersResponse response = + recordFactory.newRecordInstance(GetContainersResponse.class); + response.setContainerList(new ArrayList(history + .getContainers(request.getApplicationAttemptId()).values())); + return response; + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + // TODO Auto-generated method stub + return null; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java index 0c20d49..3ed3546 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -1,25 +1,35 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.applicationhistoryservice; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * History server that keeps track of all types of history in the cluster. @@ -27,6 +37,14 @@ */ public class ApplicationHistoryServer extends CompositeService { + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + private static final Log LOG = + LogFactory.getLog(ApplicationHistoryServer.class); + + ApplicationHistoryClientService ahsClientService; + ApplicationHistory historyService; + ApplicationHistoryContext historyContext; + public ApplicationHistoryServer() { super(ApplicationHistoryServer.class.getName()); } @@ -34,9 +52,55 @@ public ApplicationHistoryServer() { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - AHSClientService ahsClientService = new AHSClientService(); + historyService = new ApplicationHistory(); + historyContext = (ApplicationHistoryContext) historyService; + ahsClientService = new ApplicationHistoryClientService(historyContext); addService(ahsClientService); + addService(historyService); AHSWebServer webServer = new AHSWebServer(); addService(webServer); } + + @Override + protected void serviceStart() throws Exception { + DefaultMetricsSystem.initialize("ApplicationHistoryServer"); + JvmMetrics.initSingleton("ApplicationHistoryServer", null); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + DefaultMetricsSystem.shutdown(); + super.serviceStop(); + } + + @Private + public ApplicationHistoryClientService getClientService() { + return this.ahsClientService; + } + + static ApplicationHistoryServer launchAppHistoryServer(String[] args) { + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(ApplicationHistoryServer.class, args, + LOG); + ApplicationHistoryServer appHistoryServer = null; + try { + appHistoryServer = new ApplicationHistoryServer(); + ShutdownHookManager.get().addShutdownHook( + new CompositeServiceShutdownHook(appHistoryServer), + SHUTDOWN_HOOK_PRIORITY); + YarnConfiguration conf = new YarnConfiguration(); + appHistoryServer.init(conf); + appHistoryServer.start(); + } catch (Throwable t) { + LOG.fatal("Error starting ApplicationHistoryServer", t); + ExitUtil.terminate(-1, "Error starting ApplicationHistoryServer"); + } + return appHistoryServer; + } + + public static void main(String[] args) { + launchAppHistoryServer(args); + } }