diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index c5e0308..3cb35a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.net.ConnectException; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -74,9 +77,11 @@ import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientRequest; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.filter.ClientFilter; @Public @Evolving @@ -97,14 +102,23 @@ = "show_container_log_info"; private static final String OUT_OPTION = "out"; private static final String SIZE_OPTION = "size"; + private static final String CLIENT_MAX_RETRY_OPTION = "client_max_retries"; + private static final String CLIENT_RETRY_INTERVAL_OPTION + = "client_retry_interval_ms"; public static final String HELP_CMD = "help"; private PrintStream outStream = System.out; private YarnClient yarnClient = null; + private Client webServiceClient = null; + + @Private + @VisibleForTesting + ClientConnectionRetry connectionRetry; @Override public int run(String[] args) throws Exception { try { yarnClient = createYarnClient(); + webServiceClient = Client.create(); return runCommand(args); } finally { if (yarnClient != null) { @@ -139,6 +153,8 @@ private int runCommand(String[] args) throws Exception { List amContainersList = new ArrayList(); String localDir = null; long bytes = Long.MAX_VALUE; + int maxRetries = 30; + long retryInterval = 1000; try { CommandLine commandLine = parser.parse(opts, args, false); appIdStr = commandLine.getOptionValue(APPLICATION_ID_OPTION); @@ -170,6 +186,14 @@ private int runCommand(String[] args) throws Exception { if (commandLine.hasOption(SIZE_OPTION)) { bytes = Long.parseLong(commandLine.getOptionValue(SIZE_OPTION)); } + if (commandLine.hasOption(CLIENT_MAX_RETRY_OPTION)) { + maxRetries = Integer.parseInt(commandLine.getOptionValue( + CLIENT_MAX_RETRY_OPTION)); + } + if (commandLine.hasOption(CLIENT_RETRY_INTERVAL_OPTION)) { + retryInterval = Long.parseLong(commandLine.getOptionValue( + CLIENT_RETRY_INTERVAL_OPTION)); + } } catch (ParseException e) { System.err.println("options parsing failed: " + e.getMessage()); printHelpMessage(printOpts); @@ -231,6 +255,11 @@ private int runCommand(String[] args) throws Exception { } } + // Set up Retry WebService Client + connectionRetry = new ClientConnectionRetry(maxRetries, retryInterval); + ClientJerseyRetryFilter retryFilter = new ClientJerseyRetryFilter(); + webServiceClient.addFilter(retryFilter); + LogCLIHelpers logCliHelper = new LogCLIHelpers(); logCliHelper.setConf(getConf()); @@ -341,7 +370,6 @@ private void printHelpMessage(Options options) { protected List getAMContainerInfoForRMWebService( Configuration conf, String appId) throws ClientHandlerException, UniformInterfaceException, JSONException { - Client webServiceClient = Client.create(); String webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(conf); WebResource webResource = webServiceClient.resource(webAppAddress); @@ -363,7 +391,6 @@ private void printHelpMessage(Options options) { private List getAMContainerInfoForAHSWebService( Configuration conf, String appId) throws ClientHandlerException, UniformInterfaceException, JSONException { - Client webServiceClient = Client.create(); String webAppAddress = WebAppUtils.getHttpSchemePrefix(conf) + WebAppUtils.getAHSWebAppURLWithoutScheme(conf); @@ -416,7 +443,6 @@ private boolean fetchAllLogFiles(String[] logFiles, String[] logFilesRegex) { throws IOException { List> logFileInfos = new ArrayList<>(); - Client webServiceClient = Client.create(); try { WebResource webResource = webServiceClient .resource(WebAppUtils.getHttpSchemePrefix(conf) + nodeHttpAddress); @@ -490,7 +516,6 @@ private PerContainerLogFileInfo generatePerContainerLogFileInfoFromJSON( lastModificationTime); } - @Private @VisibleForTesting public int printContainerLogsFromRunningApplication(Configuration conf, @@ -521,7 +546,6 @@ public int printContainerLogsFromRunningApplication(Configuration conf, ContainerLogsRequest newOptions = new ContainerLogsRequest(request); newOptions.setLogTypes(matchedFiles); - Client webServiceClient = Client.create(); boolean foundAnyLogs = false; byte[] buffer = new byte[65536]; for (String logFile : newOptions.getLogTypes()) { @@ -797,7 +821,13 @@ private int showApplicationLogInfo(ContainerLogsRequest request, } } - private Options createCommandOpts() { + @Private + @VisibleForTesting + /** + * Create Command Options. + * @return the command options + */ + public Options createCommandOpts() { Options opts = new Options(); opts.addOption(HELP_CMD, false, "Displays help for all commands."); Option appIdOpt = @@ -859,6 +889,13 @@ private Options createCommandOpts() { opts.addOption(SIZE_OPTION, true, "Prints the log file's first 'n' bytes " + "or the last 'n' bytes. Use negative values as bytes to read from " + "the end and positive values as bytes to read from the beginning."); + opts.addOption(CLIENT_MAX_RETRY_OPTION, true, "Set max retry number for a" + + " retry client to get the container logs for the running " + + "applications. Use a negative value to make retry forever. " + + "The default value is 30."); + opts.addOption(CLIENT_RETRY_INTERVAL_OPTION, true, + "Work with --client_max_retries to create a retry client. " + + "The default value is 1000."); opts.getOption(APPLICATION_ID_OPTION).setArgName("Application ID"); opts.getOption(CONTAINER_ID_OPTION).setArgName("Container ID"); opts.getOption(NODE_ADDRESS_OPTION).setArgName("Node Address"); @@ -866,10 +903,20 @@ private Options createCommandOpts() { opts.getOption(AM_CONTAINER_OPTION).setArgName("AM Containers"); opts.getOption(OUT_OPTION).setArgName("Local Directory"); opts.getOption(SIZE_OPTION).setArgName("size"); + opts.getOption(CLIENT_MAX_RETRY_OPTION).setArgName("Max Retries"); + opts.getOption(CLIENT_RETRY_INTERVAL_OPTION) + .setArgName("Retry Interval"); return opts; } - private Options createPrintOpts(Options commandOpts) { + @Private + @VisibleForTesting + /** + * Create Print options for helper message. + * @param commandOpts the options + * @return the print options + */ + public Options createPrintOpts(Options commandOpts) { Options printOpts = new Options(); printOpts.addOption(commandOpts.getOption(HELP_CMD)); printOpts.addOption(commandOpts.getOption(CONTAINER_ID_OPTION)); @@ -884,6 +931,8 @@ private Options createPrintOpts(Options commandOpts) { printOpts.addOption(commandOpts.getOption(SIZE_OPTION)); printOpts.addOption(commandOpts.getOption( PER_CONTAINER_LOG_FILES_REGEX_OPTION)); + printOpts.addOption(commandOpts.getOption(CLIENT_MAX_RETRY_OPTION)); + printOpts.addOption(commandOpts.getOption(CLIENT_RETRY_INTERVAL_OPTION)); return printOpts; } @@ -1286,4 +1335,120 @@ public String getNodeHttpAddressFromRMWebString(ContainerLogsRequest request) return nodeInfo.has("nodeHTTPAddress") ? nodeInfo.getString("nodeHTTPAddress") : null; } + + // Class to handle retry + static class ClientConnectionRetry { + + // maxRetries < 0 means keep trying + @Private + @VisibleForTesting + public int maxRetries; + + @Private + @VisibleForTesting + public long retryInterval; + + // Indicates if retries happened last time. Only tests should read it. + // In unit tests, retryOn() calls should _not_ be concurrent. + private boolean retried = false; + + @Private + @VisibleForTesting + boolean getRetired() { + return retried; + } + + // Constructor with default retry settings + public ClientConnectionRetry(int inputMaxRetries, + long inputRetryInterval) { + this.maxRetries = inputMaxRetries; + this.retryInterval = inputRetryInterval; + } + + public Object retryOn(ClientRetryOp op) + throws RuntimeException, IOException { + int leftRetries = maxRetries; + retried = false; + + // keep trying + while (true) { + try { + // try perform the op, if fail, keep retrying + return op.run(); + } catch (IOException | RuntimeException e) { + // break if there's no retries left + if (leftRetries == 0) { + break; + } + if (op.shouldRetryOn(e)) { + logException(e, leftRetries); + } else { + throw e; + } + } + if (leftRetries > 0) { + leftRetries--; + } + retried = true; + try { + // sleep for the given time interval + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + System.out.println("Client retry sleep interrupted! "); + } + } + throw new RuntimeException("Connection retries limit exceeded."); + }; + + private void logException(Exception e, int leftRetries) { + if (leftRetries > 0) { + System.out.println("Exception caught by ClientConnectionRetry," + + " will try " + leftRetries + " more time(s).\nMessage: " + + e.getMessage()); + } else { + // note that maxRetries may be -1 at the very beginning + System.out.println("ConnectionException caught by ClientConnectionRetry," + + " will keep retrying.\nMessage: " + + e.getMessage()); + } + } + } + + private class ClientJerseyRetryFilter extends ClientFilter { + @Override + public ClientResponse handle(final ClientRequest cr) + throws ClientHandlerException { + // Set up the retry operation + ClientRetryOp jerseyRetryOp = new ClientRetryOp() { + @Override + public Object run() { + // Try pass the request, if fail, keep retrying + return getNext().handle(cr); + } + + @Override + public boolean shouldRetryOn(Exception e) { + // Only retry on connection exceptions + return (e instanceof ClientHandlerException) + && (e.getCause() instanceof ConnectException || + e.getCause() instanceof SocketTimeoutException || + e.getCause() instanceof SocketException); + } + }; + try { + return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); + } catch (IOException e) { + throw new ClientHandlerException("Jersey retry failed!\nMessage: " + + e.getMessage()); + } + } + } + + // Abstract class for an operation that should be retried by client + private static abstract class ClientRetryOp { + // The operation that should be retried + public abstract Object run() throws IOException; + // The method to indicate if we should retry given the incoming exception + public abstract boolean shouldRetryOn(Exception e); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 9b0268f..45c8214 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -48,9 +48,12 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -195,7 +198,8 @@ public void testUnknownApplicationId() throws Exception { "Unable to get ApplicationState")); } - @Test(timeout = 5000l) + @SuppressWarnings("rawtypes") + @Test (timeout = 10000) public void testHelpMessage() throws Exception { Configuration conf = new YarnConfiguration(); YarnClient mockYarnClient = createMockYarnClient( @@ -206,79 +210,12 @@ public void testHelpMessage() throws Exception { int exitCode = dumper.run(new String[]{}); assertTrue(exitCode == -1); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintWriter pw = new PrintWriter(baos); - pw.println("Retrieve logs for YARN applications."); - pw.println("usage: yarn logs -applicationId [OPTIONS]"); - pw.println(); - pw.println("general options are:"); - pw.println(" -am Prints the AM Container logs for"); - pw.println(" this application. Specify"); - pw.println(" comma-separated value to get logs"); - pw.println(" for related AM Container. For"); - pw.println(" example, If we specify -am 1,2,"); - pw.println(" we will get the logs for the"); - pw.println(" first AM Container as well as the"); - pw.println(" second AM Container. To get logs"); - pw.println(" for all AM Containers, use -am"); - pw.println(" ALL. To get logs for the latest"); - pw.println(" AM Container, use -am -1. By"); - pw.println(" default, it will print all"); - pw.println(" available logs. Work with"); - pw.println(" -log_files to get only specific"); - pw.println(" logs."); - pw.println(" -appOwner AppOwner (assumed to be current"); - pw.println(" user if not specified)"); - pw.println(" -containerId ContainerId. By default, it will"); - pw.println(" print all available logs. Work"); - pw.println(" with -log_files to get only"); - pw.println(" specific logs. If specified, the"); - pw.println(" applicationId can be omitted"); - pw.println(" -help Displays help for all commands."); - pw.println(" -list_nodes Show the list of nodes that"); - pw.println(" successfully aggregated logs."); - pw.println(" This option can only be used with"); - pw.println(" finished applications."); - pw.println(" -log_files Specify comma-separated value to"); - pw.println(" get exact matched log files. Use"); - pw.println(" \"ALL\" or \"*\" to fetch all the log"); - pw.println(" files for the container."); - pw.println(" -log_files_pattern Specify comma-separated value to"); - pw.println(" get matched log files by using"); - pw.println(" java regex. Use \".*\" to fetch all"); - pw.println(" the log files for the container."); - pw.println(" -nodeAddress NodeAddress in the format"); - pw.println(" nodename:port"); - pw.println(" -out Local directory for storing"); - pw.println(" individual container logs. The"); - pw.println(" container logs will be stored"); - pw.println(" based on the node the container"); - pw.println(" ran on."); - pw.println(" -show_application_log_info Show the containerIds which"); - pw.println(" belong to the specific"); - pw.println(" Application. You can combine this"); - pw.println(" with --nodeAddress to get"); - pw.println(" containerIds for all the"); - pw.println(" containers on the specific"); - pw.println(" NodeManager."); - pw.println(" -show_container_log_info Show the container log metadata,"); - pw.println(" including log-file names, the"); - pw.println(" size of the log files. You can"); - pw.println(" combine this with --containerId"); - pw.println(" to get log metadata for the"); - pw.println(" specific container, or with"); - pw.println(" --nodeAddress to get log metadata"); - pw.println(" for all the containers on the"); - pw.println(" specific NodeManager."); - pw.println(" -size Prints the log file's first 'n'"); - pw.println(" bytes or the last 'n' bytes. Use"); - pw.println(" negative values as bytes to read"); - pw.println(" from the end and positive values"); - pw.println(" as bytes to read from the"); - pw.println(" beginning."); - pw.close(); - String appReportStr = baos.toString("UTF-8"); - Assert.assertEquals(appReportStr, sysOutStream.toString()); + + Options options = dumper.createCommandOpts(); + for (Iterator i = options.getOptions().iterator(); i.hasNext();) { + Option option = (Option) i.next(); + Assert.assertTrue(sysOutStream.toString().contains(option.getOpt())); + } } @Test (timeout = 15000) @@ -608,6 +545,56 @@ public ContainerReport getContainerReport(String containerIdStr) fs.delete(new Path(rootLogDir), true); } + @Test + public void testCheckRetryCount() throws Exception { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + NodeId nodeId = NodeId.newInstance("localhost", 1234); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(appId, 1); + + // Create a mock ApplicationAttempt Report + ApplicationAttemptReport mockAttemptReport = mock( + ApplicationAttemptReport.class); + doReturn(appAttemptId).when(mockAttemptReport).getApplicationAttemptId(); + List attemptReports = Arrays.asList( + mockAttemptReport); + + // Create one mock containerReport + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerReport mockContainerReport1 = mock(ContainerReport.class); + doReturn(containerId1).when(mockContainerReport1).getContainerId(); + doReturn(nodeId).when(mockContainerReport1).getAssignedNode(); + doReturn("http://localhost:2345").when(mockContainerReport1) + .getNodeHttpAddress(); + doReturn(ContainerState.RUNNING).when(mockContainerReport1) + .getContainerState(); + List containerReports = Arrays.asList( + mockContainerReport1); + // Mock the YarnClient, and it would report the previous created + // mockAttemptReport and previous two created mockContainerReports + YarnClient mockYarnClient = createMockYarnClient( + YarnApplicationState.RUNNING, ugi.getShortUserName(), true, + attemptReports, containerReports); + doReturn(mockContainerReport1).when(mockYarnClient).getContainerReport( + any(ContainerId.class)); + LogsCLI cli = new LogsCLIForTest(mockYarnClient); + cli.setConf(new YarnConfiguration()); + try { + cli.run(new String[] {"-containerId", + containerId1.toString(), "-client_max_retries", "5"}); + Assert.fail("Exception expected! " + + "NodeManager should be off to run this test. "); + } catch (RuntimeException ce) { + Assert.assertTrue( + "Handler exception for reason other than retry: " + ce.getMessage(), + ce.getMessage().contains("Connection retries limit exceeded")); + Assert.assertTrue("Retry filter didn't perform any retries! ", cli + .connectionRetry.getRetired()); + } + } + @Test (timeout = 5000) public void testGetRunningContainerLogs() throws Exception { UserGroupInformation ugi = UserGroupInformation.getCurrentUser();