From 68390d25da33a002935f7fa148579cc7b8802f72 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Mon, 9 Sep 2019 22:46:46 +0530 Subject: [PATCH] YARN-9822. Prevent locking TimelineWriter when TimelineStorage is down. --- .../yarn/api/records/timeline/TimelineHealth.java | 4 +- .../DocumentStoreTimelineReaderImpl.java | 4 +- .../DocumentStoreTimelineWriterImpl.java | 8 +- .../storage/HBaseTimelineReaderImpl.java | 2 +- .../storage/HBaseTimelineWriterImpl.java | 14 ++++ .../collector/TimelineCollector.java | 85 +++++++++++++++++++--- .../storage/FileSystemTimelineReaderImpl.java | 2 +- .../storage/FileSystemTimelineWriterImpl.java | 15 ++++ .../storage/NoOpTimelineWriterImpl.java | 7 ++ .../timelineservice/storage/TimelineWriter.java | 10 +++ .../collector/TestTimelineCollector.java | 66 ++++++++++++++++- 11 files changed, 196 insertions(+), 21 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineHealth.java index d592167..3462441 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineHealth.java @@ -39,12 +39,12 @@ * Timline health status. * * RUNNING - Service is up and running - * READER_CONNECTION_FAULURE - isConnectionAlive() of reader implementation + * CONNECTION_FAULURE - isConnectionAlive() of reader / writer implementation * reported an error */ public enum TimelineHealthStatus { RUNNING, - READER_CONNECTION_FAILURE + CONNECTION_FAILURE } private TimelineHealthStatus healthStatus; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java index 8de3b86..7d8098e 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java @@ -108,7 +108,7 @@ public TimelineHealth getHealthStatus() { ""); } else { return new TimelineHealth( - TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE, + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, "Timeline store reader not initialized."); } } @@ -131,4 +131,4 @@ public TimelineHealth getHealthStatus() { } return timelineEntities; } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java index 572d888..0ea70f9 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack; @@ -151,6 +152,11 @@ public TimelineWriteResponse write(TimelineCollectorContext context, return null; } + @Override + public TimelineHealth getHealthStatus() { + return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, ""); + } + private void appendSubAppUserIfExists(TimelineCollectorContext context, String subApplicationUser) { String userId = context.getUserId(); @@ -282,4 +288,4 @@ public TimelineWriteResponse aggregate(TimelineEntity data, @Override public void flush() { } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index f3592d2..b6e1d76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -115,7 +115,7 @@ public TimelineHealth getHealthStatus() { ""); } catch (IOException e){ return new TimelineHealth( - TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE, + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, "HBase connection is down"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index dda004d..7233dab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; @@ -604,6 +605,19 @@ public TimelineWriteResponse aggregate(TimelineEntity data, return null; } + @Override + public TimelineHealth getHealthStatus() { + try { + storageMonitor.checkStorageIsUp(); + return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, + ""); + } catch (IOException e){ + return new TimelineHealth( + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, + "HBase connection is down"); + } + } + /* * (non-Javadoc) * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 0c54ed0..1c1fe11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -71,6 +72,9 @@ private volatile boolean isStopped = false; + private int maxWriteRetries; + private long writeRetryInterval; + public TimelineCollector(String name) { super(name); } @@ -86,6 +90,13 @@ protected void serviceInit(Configuration conf) throws Exception { new ArrayBlockingQueue<>(capacity)); pool.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardOldestPolicy()); + + maxWriteRetries = + conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + writeRetryInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); } @Override @@ -153,18 +164,54 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { LOG.debug("putEntities(entities={}, callerUgi={})", entities, callerUgi); - TimelineWriteResponse response; - // synchronize on the writer object so that no other threads can - // flush the writer buffer concurrently and swallow any exception - // caused by the timeline enitites that are being put here. - synchronized (writer) { - response = writeTimelineEntities(entities, callerUgi); - flushBufferedTimelineEntities(); + TimelineWriteResponse response = null; + try { + boolean isStorageUp = checkRetryWithSleep(); + if (isStorageUp) { + // synchronize on the writer object so that no other threads can + // flush the writer buffer concurrently and swallow any exception + // caused by the timeline enitites that are being put here. + synchronized (writer) { + response = writeTimelineEntities(entities, callerUgi); + flushBufferedTimelineEntities(); + } + } else { + String msg = String.format("Failed to putEntities(" + + "entities=%s, callerUgi=%s) as Timeline Storage is Down", + entities, callerUgi); + throw new IOException(msg); + } + } catch (InterruptedException ex) { + String msg = String.format("Interrupted while retrying to putEntities(" + + "entities=%s, callerUgi=%s)", entities, callerUgi); + throw new IOException(msg); } return response; } + + private boolean checkRetryWithSleep() throws InterruptedException { + int retries = maxWriteRetries; + while (retries > 0) { + TimelineHealth timelineHealth = writer.getHealthStatus(); + if (timelineHealth.getHealthStatus().equals( + TimelineHealth.TimelineHealthStatus.RUNNING)) { + return true; + } else { + try { + Thread.sleep(writeRetryInterval); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw ex; + } + retries--; + } + } + return false; + } + + /** * Add or update an domain. If the domain already exists, only the owner * and the admin can update it. @@ -179,11 +226,25 @@ public TimelineWriteResponse putDomain(TimelineDomain domain, UserGroupInformation callerUgi) throws IOException { LOG.debug("putDomain(domain={}, callerUgi={})", domain, callerUgi); - TimelineWriteResponse response; - synchronized (writer) { - final TimelineCollectorContext context = getTimelineEntityContext(); - response = writer.write(context, domain); - flushBufferedTimelineEntities(); + TimelineWriteResponse response = null; + try { + boolean isStorageUp = checkRetryWithSleep(); + if (isStorageUp) { + synchronized (writer) { + final TimelineCollectorContext context = getTimelineEntityContext(); + response = writer.write(context, domain); + flushBufferedTimelineEntities(); + } + } else { + String msg = String.format("Failed to putDomain(" + + "domain=%s, callerUgi=%s) as Timeline Storage is Down", + domain, callerUgi); + throw new IOException(msg); + } + } catch (InterruptedException ex) { + String msg = String.format("Interrupted while retrying to putDomain(" + + "domain=%s, callerUgi=%s)", domain, callerUgi); + throw new IOException(msg); } return response; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index ea7c32e..bb06d84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -454,7 +454,7 @@ public TimelineHealth getHealthStatus() { fs.exists(rootPath); } catch (IOException e) { return new TimelineHealth( - TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE, + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, e.getMessage() ); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 023d496..410a651 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -194,6 +195,20 @@ public void flush() throws IOException { // no op } + @Override + public TimelineHealth getHealthStatus() { + try { + fs.exists(rootPath); + } catch (IOException e) { + return new TimelineHealth( + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, + e.getMessage() + ); + } + return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, + ""); + } + private void mkdirs(Path... paths) throws IOException, InterruptedException { for (Path path: paths) { if (!existsWithRetries(path)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/NoOpTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/NoOpTimelineWriterImpl.java index 48b3348..fd31209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/NoOpTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/NoOpTimelineWriterImpl.java @@ -19,6 +19,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -77,4 +78,10 @@ public TimelineWriteResponse aggregate(TimelineEntity data, public void flush() throws IOException { LOG.debug("NoOpTimelineWriter is configured. Ignoring flush call"); } + + @Override + public TimelineHealth getHealthStatus() { + return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, + "NoOpTimelineWriter is configured. "); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 08cfc8b..ccc7491 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -95,4 +96,13 @@ TimelineWriteResponse aggregate(TimelineEntity data, * entities to the backend storage. */ void flush() throws IOException; + + /** + * Check if writer connection is working properly. + * + * @return True if writer connection works as expected, false otherwise. + */ + TimelineHealth getHealthStatus(); + + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index 766c2cd..3fc6610 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -155,7 +156,17 @@ public void testAggregation() throws Exception { @Test public void testPutEntity() throws IOException { TimelineWriter writer = mock(TimelineWriter.class); + TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth. + TimelineHealthStatus.RUNNING, ""); + when(writer.getHealthStatus()).thenReturn(timelineHealth); + + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 500L); + TimelineCollector collector = new TimelineCollectorForTest(writer); + collector.init(conf); TimelineEntities entities = generateTestEntities(1, 1); collector.putEntities( @@ -166,6 +177,36 @@ public void testPutEntity() throws IOException { verify(writer, times(1)).flush(); } + + @Test + public void testPutEntityWithStorageDown() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth. + TimelineHealthStatus.CONNECTION_FAILURE, ""); + when(writer.getHealthStatus()).thenReturn(timelineHealth); + + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 500L); + + TimelineCollector collector = new TimelineCollectorForTest(writer); + collector.init(conf); + + TimelineEntities entities = generateTestEntities(1, 1); + boolean exceptionCaught = false; + try { + collector.putEntities(entities, UserGroupInformation. + createRemoteUser("test-user")); + } catch (Exception e) { + if (e.getMessage().contains("Failed to putEntities")) { + exceptionCaught = true; + } + } + assertTrue("TimelineCollector putEntity failed to " + + "handle storage down", exceptionCaught); + } + /** * Test TimelineCollector's interaction with TimelineWriter upon * putEntityAsync() calls. @@ -222,7 +263,17 @@ public void testAsyncEntityDiscard() throws Exception { */ @Test public void testPutDomain() throws IOException { TimelineWriter writer = mock(TimelineWriter.class); + TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth. + TimelineHealthStatus.RUNNING, ""); + when(writer.getHealthStatus()).thenReturn(timelineHealth); + + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 500L); + TimelineCollector collector = new TimelineCollectorForTest(writer); + collector.init(conf); TimelineDomain domain = generateDomain("id", "desc", "owner", "reader1,reader2", "writer", 0L, @@ -287,8 +338,19 @@ public TimelineCollectorContext getTimelineEntityContext() { 1L, ApplicationId.newInstance(ts, 1).toString()); } }; - collector.init(new Configuration()); - collector.setWriter(mock(TimelineWriter.class)); + + TimelineWriter writer = mock(TimelineWriter.class); + TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth. + TimelineHealthStatus.RUNNING, ""); + when(writer.getHealthStatus()).thenReturn(timelineHealth); + + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 500L); + + collector.init(conf); + collector.setWriter(writer); // Put 5 entities with different metric values. TimelineEntities entities = new TimelineEntities(); -- 2.7.4 (Apple Git-66)