From 6024beb6074c0fe73e0bd43ee384c0c5d3754b56 Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Wed, 27 Feb 2019 22:29:42 +0530 Subject: [PATCH] YARN-8499 --- .../storage/HBaseTimelineReaderImpl.java | 65 ++++----------- .../storage/TimelineStorageMonitor.java | 95 ++++++++++++++++++++++ 2 files changed, 111 insertions(+), 49 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index fadfd14..df9a5e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -55,10 +55,10 @@ private Connection conn; private Configuration monitorHBaseConf = null; private Connection monitorConn; - private ScheduledExecutorService monitorExecutorService; private TimelineReaderContext monitorContext; private long monitorInterval; - private AtomicBoolean hbaseDown = new AtomicBoolean(); + private TimelineStorageMonitor storageMonitor; + private static String storageName = "HBase"; public HBaseTimelineReaderImpl() { super(HBaseTimelineReaderImpl.class.getName()); @@ -86,9 +86,6 @@ public void serviceInit(Configuration conf) throws Exception { monitorInterval); monitorHBaseConf.setInt("zookeeper.recovery.retry", 1); monitorConn = ConnectionFactory.createConnection(monitorHBaseConf); - - monitorExecutorService = Executors.newScheduledThreadPool(1); - hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); conn = ConnectionFactory.createConnection(hbaseConf); } @@ -96,10 +93,14 @@ public void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { super.serviceStart(); - LOG.info("Scheduling HBase liveness monitor at interval {}", - monitorInterval); - monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0, - monitorInterval, TimeUnit.MILLISECONDS); + storageMonitor = new TimelineStorageMonitor(storageName) { + void doHealthCheckup() throws Exception { + TimelineEntityReader reader = TimelineEntityReaderFactory. + createMultipleEntitiesReader(monitorContext, MONITOR_FILTERS, + DATA_TO_RETRIEVE); + reader.readEntities(monitorHBaseConf, monitorConn); + } + }; } @Override @@ -108,31 +109,20 @@ protected void serviceStop() throws Exception { LOG.info("closing the hbase Connection"); conn.close(); } - if (monitorExecutorService != null) { - monitorExecutorService.shutdownNow(); - if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) { - LOG.warn("failed to stop the monitir task in time. " + - "will still proceed to close the monitor."); - } - } + storageMonitor.stop(); monitorConn.close(); super.serviceStop(); } - private void checkHBaseDown() throws IOException { - if (hbaseDown.get()) { - throw new IOException("HBase is down"); - } - } - public boolean isHBaseDown() { - return hbaseDown.get(); + return storageMonitor.isStorageDown(); } + @Override public TimelineEntity getEntity(TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageDown(); TimelineEntityReader reader = TimelineEntityReaderFactory.createSingleEntityReader(context, dataToRetrieve); @@ -143,7 +133,7 @@ public TimelineEntity getEntity(TimelineReaderContext context, public Set getEntities(TimelineReaderContext context, TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageDown(); TimelineEntityReader reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(context, filters, dataToRetrieve); @@ -153,7 +143,7 @@ public TimelineEntity getEntity(TimelineReaderContext context, @Override public Set getEntityTypes(TimelineReaderContext context) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageDown(); EntityTypeReader reader = new EntityTypeReader(context); return reader.readEntityTypes(hbaseConf, conn); } @@ -163,27 +153,4 @@ public TimelineEntity getEntity(TimelineReaderContext context, protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE = new TimelineDataToRetrieve(null, null, null, null, null, null); - private class HBaseMonitor implements Runnable { - @Override - public void run() { - try { - LOG.info("Running HBase liveness monitor"); - TimelineEntityReader reader = - TimelineEntityReaderFactory.createMultipleEntitiesReader( - monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE); - reader.readEntities(monitorHBaseConf, monitorConn); - - // on success, reset hbase down flag - if (hbaseDown.getAndSet(false)) { - if(LOG.isDebugEnabled()) { - LOG.debug("HBase request succeeded, assuming HBase up"); - } - } - } catch (Exception e) { - LOG.warn("Got failure attempting to read from timeline storage, " + - "assuming HBase down", e); - hbaseDown.getAndSet(true); - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java new file mode 100644 index 0000000..69e5a69 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This abstract class is for monitoring Health of Timeline Storage. + */ +public abstract class TimelineStorageMonitor { + private static final Logger LOG = LoggerFactory + .getLogger(TimelineStorageMonitor.class); + + private ScheduledExecutorService monitorExecutorService; + private long monitorInterval; + private String storageName; + private AtomicBoolean storageDown = new AtomicBoolean(); + + public TimelineStorageMonitor(String storageName) { + this.storageName = storageName; + LOG.info("Scheduling {} monitor at interval {}", + this.storageName, monitorInterval); + monitorExecutorService = Executors.newScheduledThreadPool(1); + monitorExecutorService.scheduleAtFixedRate(new MonitorThread(), 0, + monitorInterval, TimeUnit.MILLISECONDS); + } + + abstract void doHealthCheckup() throws Exception; + + public void checkStorageDown() throws IOException { + if (storageDown.get()) { + throw new IOException(storageName + " is down"); + } + } + + public boolean isStorageDown() { + return storageDown.get(); + } + + public void stop() throws Exception { + if (monitorExecutorService != null) { + monitorExecutorService.shutdownNow(); + if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.warn("Failed to stop the monitor task in time. " + + "will still proceed to close the monitor."); + } + } + } + + private class MonitorThread implements Runnable { + @Override + public void run () { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Running Timeline Storage monitor"); + } + doHealthCheckup(); + if (storageDown.getAndSet(false)) { + if (LOG.isDebugEnabled()) { + LOG.debug("{} health checkup succeeded, " + + "assuming storage up", storageName); + } + } + } catch (Exception e) { + LOG.warn("Got failure attempting to read from {}, " + + "assuming Storage down", storageName, e); + storageDown.getAndSet(true); + } + } + } + +} -- 2.7.4 (Apple Git-66)