From e0943853f7b1e0ac6f71df6243caf182aba7380f Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Thu, 28 Feb 2019 13:12:15 +0530 Subject: [PATCH] YARN-8499 --- .../storage/HBaseTimelineReaderImpl.java | 70 ++++---------- .../storage/TimelineStorageMonitor.java | 102 +++++++++++++++++++++ 2 files changed, 119 insertions(+), 53 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index fadfd14..fe49527 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -20,10 +20,6 @@ import java.io.IOException; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -55,10 +51,10 @@ private Connection conn; private Configuration monitorHBaseConf = null; private Connection monitorConn; - private ScheduledExecutorService monitorExecutorService; private TimelineReaderContext monitorContext; private long monitorInterval; - private AtomicBoolean hbaseDown = new AtomicBoolean(); + private TimelineStorageMonitor storageMonitor; + private static String storageName = "HBase"; public HBaseTimelineReaderImpl() { super(HBaseTimelineReaderImpl.class.getName()); @@ -86,20 +82,22 @@ public void serviceInit(Configuration conf) throws Exception { monitorInterval); monitorHBaseConf.setInt("zookeeper.recovery.retry", 1); monitorConn = ConnectionFactory.createConnection(monitorHBaseConf); - - monitorExecutorService = Executors.newScheduledThreadPool(1); - hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); conn = ConnectionFactory.createConnection(hbaseConf); + + storageMonitor = new TimelineStorageMonitor(conf, storageName) { + void doHealthCheckup() throws Exception { + TimelineEntityReader reader = TimelineEntityReaderFactory. + createMultipleEntitiesReader(monitorContext, MONITOR_FILTERS, + DATA_TO_RETRIEVE); + reader.readEntities(monitorHBaseConf, monitorConn); + } + }; } @Override protected void serviceStart() throws Exception { super.serviceStart(); - LOG.info("Scheduling HBase liveness monitor at interval {}", - monitorInterval); - monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0, - monitorInterval, TimeUnit.MILLISECONDS); } @Override @@ -108,31 +106,20 @@ protected void serviceStop() throws Exception { LOG.info("closing the hbase Connection"); conn.close(); } - if (monitorExecutorService != null) { - monitorExecutorService.shutdownNow(); - if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) { - LOG.warn("failed to stop the monitir task in time. " + - "will still proceed to close the monitor."); - } - } + storageMonitor.stop(); monitorConn.close(); super.serviceStop(); } - private void checkHBaseDown() throws IOException { - if (hbaseDown.get()) { - throw new IOException("HBase is down"); - } - } - public boolean isHBaseDown() { - return hbaseDown.get(); + return storageMonitor.isStorageDown(); } + @Override public TimelineEntity getEntity(TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageDown(); TimelineEntityReader reader = TimelineEntityReaderFactory.createSingleEntityReader(context, dataToRetrieve); @@ -143,7 +130,7 @@ public TimelineEntity getEntity(TimelineReaderContext context, public Set getEntities(TimelineReaderContext context, TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageDown(); TimelineEntityReader reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(context, filters, dataToRetrieve); @@ -153,7 +140,7 @@ public TimelineEntity getEntity(TimelineReaderContext context, @Override public Set getEntityTypes(TimelineReaderContext context) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageDown(); EntityTypeReader reader = new EntityTypeReader(context); return reader.readEntityTypes(hbaseConf, conn); } @@ -163,27 +150,4 @@ public TimelineEntity getEntity(TimelineReaderContext context, protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE = new TimelineDataToRetrieve(null, null, null, null, null, null); - private class HBaseMonitor implements Runnable { - @Override - public void run() { - try { - LOG.info("Running HBase liveness monitor"); - TimelineEntityReader reader = - TimelineEntityReaderFactory.createMultipleEntitiesReader( - monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE); - reader.readEntities(monitorHBaseConf, monitorConn); - - // on success, reset hbase down flag - if (hbaseDown.getAndSet(false)) { - if(LOG.isDebugEnabled()) { - LOG.debug("HBase request succeeded, assuming HBase up"); - } - } - } catch (Exception e) { - LOG.warn("Got failure attempting to read from timeline storage, " + - "assuming HBase down", e); - hbaseDown.getAndSet(true); - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java new file mode 100644 index 0000000..b22ba9c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * This abstract class is for monitoring Health of Timeline Storage. + */ +public abstract class TimelineStorageMonitor { + private static final Logger LOG = LoggerFactory + .getLogger(TimelineStorageMonitor.class); + + private ScheduledExecutorService monitorExecutorService; + private long monitorInterval; + private String storageName; + private AtomicBoolean storageDown = new AtomicBoolean(); + + public TimelineStorageMonitor(Configuration conf, String storageName) { + this.storageName = storageName; + this.monitorInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS + ); + LOG.info("Scheduling {} monitor at interval {}", + this.storageName, monitorInterval); + monitorExecutorService = Executors.newScheduledThreadPool(1); + monitorExecutorService.scheduleAtFixedRate(new MonitorThread(), 0, + monitorInterval, TimeUnit.MILLISECONDS); + } + + abstract void doHealthCheckup() throws Exception; + + public void checkStorageDown() throws IOException { + if (storageDown.get()) { + throw new IOException(storageName + " is down"); + } + } + + public boolean isStorageDown() { + return storageDown.get(); + } + + public void stop() throws Exception { + if (monitorExecutorService != null) { + monitorExecutorService.shutdownNow(); + if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.warn("Failed to stop the monitor task in time. " + + "will still proceed to close the monitor."); + } + } + } + + private class MTestTimelineReaderHBaseDownonitorThread implements Runnable { + @Override + public void run() { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Running Timeline Storage monitor"); + } + doHealthCheckup(); + if (storageDown.getAndSet(false)) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} health checkup succeeded, " + + "assuming storage up", storageName); + } + } + } catch (Exception e) { + LOG.warn("Got failure attempting to read from {}, " + + "assuming Storage down", storageName, e); + storageDown.getAndSet(true); + } + } + } + +} -- 2.7.4 (Apple Git-66)