diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index c54ea1e..7a464d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -115,14 +115,7 @@ private DelegationTokenAuthenticatedURL.Token token; private UserGroupInformation authUgi; private String doAsUser; - private volatile String timelineServiceAddress; - - // Retry parameters for identifying new timeline service - // TODO consider to merge with connection retry - private int maxServiceRetries; - private long serviceRetryInterval; - private boolean newTimelineService = false; @Private @@ -132,7 +125,7 @@ // Abstract class for an operation that should be retried by timeline client private static abstract class TimelineClientRetryOp { // The operation that should be retried - public abstract Object run() throws IOException; + public abstract Object run() throws YarnException, IOException; // The method to indicate if we should retry given the incoming exception public abstract boolean shouldRetryOn(Exception e); } @@ -185,7 +178,7 @@ public TimelineClientConnectionRetry(Configuration conf) { } public Object retryOn(TimelineClientRetryOp op) - throws RuntimeException, IOException { + throws YarnException, IOException { int leftRetries = maxRetries; retried = false; @@ -216,9 +209,9 @@ public Object retryOn(TimelineClientRetryOp op) LOG.warn("Client retry sleep interrupted! "); } } - throw new RuntimeException("Failed to connect to timeline server. " + throw new IOException("Failed to connect to timeline server. " + "Connection retries limit exceeded. " - + "The posted timeline event may be missing"); + + "The posted timeline data may be missing"); }; private void logException(Exception e, int leftRetries) { @@ -256,7 +249,7 @@ public boolean shouldRetryOn(Exception e) { }; try { return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); - } catch (IOException e) { + } catch (YarnException | IOException e) { throw new ClientHandlerException("Jersey retry failed!\nMessage: " + e.getMessage()); } @@ -304,14 +297,7 @@ protected void serviceInit(Configuration conf) throws Exception { // old version timeline service need to get address from configuration // while new version need to auto discovery (with retry). - if (newTimelineService) { - maxServiceRetries = conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); - serviceRetryInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); - } else { + if (!newTimelineService) { if (YarnConfiguration.useHttps(conf)) { setTimelineServiceAddress(conf.get( YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, @@ -374,58 +360,33 @@ public void putDomain(TimelineDomain domain) throws IOException, doPosting(domain, "domain"); } - // Used for new timeline service only @Private - public void putObjects(String path, MultivaluedMap params, - Object obj) throws IOException, YarnException { - - // timelineServiceAddress could haven't be initialized yet - // or stale (only for new timeline service) - int retries = pollTimelineServiceAddress(this.maxServiceRetries); - - // timelineServiceAddress could be stale, add retry logic here. - boolean needRetry = true; - while (needRetry) { - try { + public void putObjects(final String path, + final MultivaluedMap params, + final Object obj) throws IOException, YarnException { + // timelineServiceAddress could be null or stale, add retry logic here. + // It can't be done in ClientFilter because timelineServiceAddress needs to + // be re-evaluated on each retry + TimelineClientRetryOp tokenRetryOp = new TimelineClientRetryOp() { + @Override + public Object run() throws YarnException, IOException { + if (timelineServiceAddress == null) { + throw new ConnectException( + "Timeline service address is still not available"); + } URI uri = constructResURI(getConfig(), timelineServiceAddress, true); putObjects(uri, path, params, obj); - needRetry = false; + return null; } - catch (Exception e) { - // TODO only handle exception for timelineServiceAddress being updated. - // skip retry for other exceptions. - checkRetryWithSleep(retries, e); - retries--; - } - } - } - /** - * Check if reaching to maximum of retries. - * @param retries - * @param e - */ - private void checkRetryWithSleep(int retries, Exception e) throws - YarnException, IOException { - if (retries > 0) { - try { - Thread.sleep(this.serviceRetryInterval); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } else { - LOG.error( - "TimelineClient has reached to max retry times :" + - this.maxServiceRetries + " for service address: " + - timelineServiceAddress); - if (e instanceof YarnException) { - throw (YarnException)e; - } else if (e instanceof IOException) { - throw (IOException)e; - } else { - throw new YarnException(e); + @Override + public boolean shouldRetryOn(Exception e) { + // Only retry on connection exceptions + return e instanceof ConnectException || + e.getCause() instanceof ConnectException; } - } + }; + connectionRetry.retryOn(tokenRetryOp); } private void putObjects( @@ -437,17 +398,18 @@ private void putObjects( .accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .put(ClientResponse.class, obj); - } catch (RuntimeException re) { - // runtime exception is expected if the client cannot connect the server - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg, re); - throw new IOException(re); + } catch (ClientHandlerException e) { + LOG.error("Failed to get the response from the timeline server at " + + base + " because " + e.getCause().getMessage()); + if (LOG.isDebugEnabled()) { + LOG.error(e); + } + throw new IOException(e.getCause()); } if (resp == null || resp.getClientResponseStatus() != ClientResponse.Status.OK) { - String msg = "Response from the timeline server is " + - ((resp == null) ? "null": + String msg = "Response from the timeline server is " + + ((resp == null) ? "null": "not successful," + " HTTP error code: " + resp.getStatus() + ", Server response:\n" + resp.getEntity(String.class)); LOG.error(msg); @@ -608,7 +570,6 @@ public boolean shouldRetryOn(Exception e) { return connectionRetry.retryOn(tokenRetryOp); } - // Old timeline service, no external retry logic. @Private @VisibleForTesting public ClientResponse doPostingObject(Object object, String path) { @@ -627,24 +588,6 @@ public ClientResponse doPostingObject(Object object, String path) { } } - /** - * Poll TimelineServiceAddress for maximum of retries times if it is null - * @param retries - * @return the left retry times - */ - private int pollTimelineServiceAddress(int retries) { - while (timelineServiceAddress == null && retries > 0) { - try { - Thread.sleep(this.serviceRetryInterval); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - timelineServiceAddress = getTimelineServiceAddress(); - retries--; - } - return retries; - } - private class TimelineURLConnectionFactory implements HttpURLConnectionFactory { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java index 859a6c9..b304748 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.IOException; import java.net.ConnectException; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -228,8 +229,8 @@ public void testDelegationTokenOperationsRetry() throws Exception { client.getDelegationToken( UserGroupInformation.getCurrentUser().getShortUserName()); assertFail(); - } catch (RuntimeException ce) { - assertException(client, ce); + } catch (IOException e) { + assertException(client, e); } try { @@ -243,8 +244,8 @@ public void testDelegationTokenOperationsRetry() throws Exception { timelineDT.getKind(), new Text("0.0.0.0:8188"))); assertFail(); - } catch (RuntimeException ce) { - assertException(client, ce); + } catch (IOException e) { + assertException(client, e); } try { @@ -258,8 +259,8 @@ public void testDelegationTokenOperationsRetry() throws Exception { timelineDT.getKind(), new Text("0.0.0.0:8188"))); assertFail(); - } catch (RuntimeException ce) { - assertException(client, ce); + } catch (IOException e) { + assertException(client, e); } } finally { client.stop(); @@ -272,9 +273,9 @@ private static void assertFail() { + "Timeline server should be off to run this test."); } - private void assertException(TimelineClientImpl client, RuntimeException ce) { + private void assertException(TimelineClientImpl client, IOException e) { Assert.assertTrue( - "Handler exception for reason other than retry: " + ce.toString(), ce + "Handler exception for reason other than retry: " + e.toString(), e .getMessage().contains("Connection retries limit exceeded")); // we would expect this exception here, check if the client has retried Assert.assertTrue("Retry filter didn't perform any retries! ", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 54c806c..f2a49d6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.timelineservice; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -66,6 +68,52 @@ public static void tearDownClass() throws Exception { } @Test + public void testClientRetryFailure() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + // Creating an invalid address + String host = collectorManager.getRestServerBindAddress().split(":")[0]; + int port = Integer.parseInt( + collectorManager.getRestServerBindAddress().split(":")[1]) + 1; + String[] serviceAddresses = new String[] { null, host + ":" + port }; + for (String serviceAddress : serviceAddresses) {; + try (TimelineClient client = TimelineClient.createTimelineClient( + ApplicationId.newInstance(0, 1))) { + client.setTimelineServiceAddress(serviceAddress); + client.init(conf); + client.start(); + TimelineEntity entity = new TimelineEntity(); + entity.setType("test entity type"); + entity.setId("test entity id"); + client.putEntities(entity); + fail(); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Connection retries limit exceeded.")); + } + } + } + + @Test + public void testClientRetryInvalid() throws Exception { + // Non-OK response shouldn't be retried. + try (TimelineClient client = TimelineClient.createTimelineClient( + ApplicationId.newInstance(0, 2));){ + client.setTimelineServiceAddress( + collectorManager.getRestServerBindAddress()); + client.init(new YarnConfiguration()); + client.start(); + TimelineEntity entity = new TimelineEntity(); + entity.setType("test entity type"); + entity.setId("test entity id"); + client.putEntities(entity); + fail(); + } catch (YarnException e) { + assertTrue(e.getMessage().contains( + "Response from the timeline server is not successful,")); + } + } + + @Test public void testPutEntities() throws Exception { TimelineClient client = TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));