diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index c54ea1e..84ffded 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -115,14 +115,7 @@ private DelegationTokenAuthenticatedURL.Token token; private UserGroupInformation authUgi; private String doAsUser; - private volatile String timelineServiceAddress; - - // Retry parameters for identifying new timeline service - // TODO consider to merge with connection retry - private int maxServiceRetries; - private long serviceRetryInterval; - private boolean newTimelineService = false; @Private @@ -132,7 +125,7 @@ // Abstract class for an operation that should be retried by timeline client private static abstract class TimelineClientRetryOp { // The operation that should be retried - public abstract Object run() throws IOException; + public abstract Object run() throws YarnException, IOException; // The method to indicate if we should retry given the incoming exception public abstract boolean shouldRetryOn(Exception e); } @@ -185,7 +178,7 @@ public TimelineClientConnectionRetry(Configuration conf) { } public Object retryOn(TimelineClientRetryOp op) - throws RuntimeException, IOException { + throws RuntimeException, YarnException, IOException { int leftRetries = maxRetries; retried = false; @@ -218,7 +211,7 @@ public Object retryOn(TimelineClientRetryOp op) } throw new RuntimeException("Failed to connect to timeline server. " + "Connection retries limit exceeded. " - + "The posted timeline event may be missing"); + + "The posted timeline data may be missing"); }; private void logException(Exception e, int leftRetries) { @@ -256,7 +249,7 @@ public boolean shouldRetryOn(Exception e) { }; try { return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); - } catch (IOException e) { + } catch (YarnException | IOException e) { throw new ClientHandlerException("Jersey retry failed!\nMessage: " + e.getMessage()); } @@ -304,14 +297,7 @@ protected void serviceInit(Configuration conf) throws Exception { // old version timeline service need to get address from configuration // while new version need to auto discovery (with retry). - if (newTimelineService) { - maxServiceRetries = conf.getInt( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); - serviceRetryInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); - } else { + if (!newTimelineService) { if (YarnConfiguration.useHttps(conf)) { setTimelineServiceAddress(conf.get( YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, @@ -374,58 +360,33 @@ public void putDomain(TimelineDomain domain) throws IOException, doPosting(domain, "domain"); } - // Used for new timeline service only @Private - public void putObjects(String path, MultivaluedMap params, - Object obj) throws IOException, YarnException { - - // timelineServiceAddress could haven't be initialized yet - // or stale (only for new timeline service) - int retries = pollTimelineServiceAddress(this.maxServiceRetries); - - // timelineServiceAddress could be stale, add retry logic here. - boolean needRetry = true; - while (needRetry) { - try { + public void putObjects(final String path, + final MultivaluedMap params, + final Object obj) throws IOException, YarnException { + // timelineServiceAddress could be null or stale, add retry logic here. + // It can't be done in ClientFilter because timelineServiceAddress needs to + // be re-evaluated on each retry + TimelineClientRetryOp tokenRetryOp = new TimelineClientRetryOp() { + @Override + public Object run() throws YarnException, IOException { + if (timelineServiceAddress == null) { + throw new ConnectException( + "Timeline service address is still not available"); + } URI uri = constructResURI(getConfig(), timelineServiceAddress, true); putObjects(uri, path, params, obj); - needRetry = false; + return null; } - catch (Exception e) { - // TODO only handle exception for timelineServiceAddress being updated. - // skip retry for other exceptions. - checkRetryWithSleep(retries, e); - retries--; - } - } - } - /** - * Check if reaching to maximum of retries. - * @param retries - * @param e - */ - private void checkRetryWithSleep(int retries, Exception e) throws - YarnException, IOException { - if (retries > 0) { - try { - Thread.sleep(this.serviceRetryInterval); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } else { - LOG.error( - "TimelineClient has reached to max retry times :" + - this.maxServiceRetries + " for service address: " + - timelineServiceAddress); - if (e instanceof YarnException) { - throw (YarnException)e; - } else if (e instanceof IOException) { - throw (IOException)e; - } else { - throw new YarnException(e); + @Override + public boolean shouldRetryOn(Exception e) { + // Only retry on connection exceptions + return e instanceof ConnectException || + e.getCause() instanceof ConnectException; } - } + }; + connectionRetry.retryOn(tokenRetryOp); } private void putObjects( @@ -437,17 +398,15 @@ private void putObjects( .accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .put(ClientResponse.class, obj); - } catch (RuntimeException re) { - // runtime exception is expected if the client cannot connect the server - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg, re); - throw new IOException(re); + } catch (ClientHandlerException e) { + String msg = "Failed to get the response from the timeline server."; + LOG.error(msg, e); + throw new IOException(e.getCause()); } if (resp == null || resp.getClientResponseStatus() != ClientResponse.Status.OK) { - String msg = "Response from the timeline server is " + - ((resp == null) ? "null": + String msg = "Response from the timeline server is " + + ((resp == null) ? "null": "not successful," + " HTTP error code: " + resp.getStatus() + ", Server response:\n" + resp.getEntity(String.class)); LOG.error(msg); @@ -608,7 +567,6 @@ public boolean shouldRetryOn(Exception e) { return connectionRetry.retryOn(tokenRetryOp); } - // Old timeline service, no external retry logic. @Private @VisibleForTesting public ClientResponse doPostingObject(Object object, String path) { @@ -627,24 +585,6 @@ public ClientResponse doPostingObject(Object object, String path) { } } - /** - * Poll TimelineServiceAddress for maximum of retries times if it is null - * @param retries - * @return the left retry times - */ - private int pollTimelineServiceAddress(int retries) { - while (timelineServiceAddress == null && retries > 0) { - try { - Thread.sleep(this.serviceRetryInterval); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - timelineServiceAddress = getTimelineServiceAddress(); - retries--; - } - return retries; - } - private class TimelineURLConnectionFactory implements HttpURLConnectionFactory { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index c8b9625..f8830d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.timelineservice; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -66,6 +68,58 @@ public static void tearDownClass() throws Exception { } @Test + public void testClientRetryFailure() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + // Creating an invalid address + String host = collectorManager.getRestServerBindAddress().split(":")[0]; + int port = Integer.parseInt( + collectorManager.getRestServerBindAddress().split(":")[1]) + 1; + String[] serviceAddresses = new String[] { null, host + ":" + port }; + for (String serviceAddress : serviceAddresses) { + TimelineClient client = + TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1)); + try { + client.setTimelineServiceAddress(serviceAddress); + client.init(conf); + client.start(); + TimelineEntity entity = new TimelineEntity(); + entity.setType("test entity type"); + entity.setId("test entity id"); + client.putEntities(entity); + fail(); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains("Connection retries limit exceeded.")); + } finally { + client.stop(); + } + } + } + + @Test + public void testClientRetryInvalid() throws Exception { + // Non-OK response shouldn't be retried. + TimelineClient client = + TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 2)); + try { + client.setTimelineServiceAddress( + collectorManager.getRestServerBindAddress()); + client.init(new YarnConfiguration()); + client.start(); + TimelineEntity entity = new TimelineEntity(); + entity.setType("test entity type"); + entity.setId("test entity id"); + client.putEntities(entity); + fail(); + } catch (YarnException e) { + assertTrue(e.getMessage().contains( + "Response from the timeline server is not successful,")); + } finally { + client.stop(); + } + } + + @Test public void testPutEntities() throws Exception { TimelineClient client = TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));