diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f21fb7d3dd..54472cb3bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -860,6 +860,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) } } replLogger.endLog(bootDumpBeginReplId.toString()); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId); } Long bootDumpEndReplId = currentNotificationId(hiveDb); LOG.info("Preparing to return {},{}->{}", @@ -870,7 +871,6 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); - work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId); return bootDumpBeginReplId; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java index a856c76319..07f2916d67 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java @@ -64,8 +64,10 @@ public static MetricSink getInstance() { public synchronized void init(HiveConf conf) { if (!isInitialised) { this.conf = conf; - this.executorService.schedule(new MetricSinkWriter(conf), getFrequencyInSecs(), TimeUnit.SECONDS); + this.executorService.scheduleAtFixedRate(new MetricSinkWriter(conf), 0, + getFrequencyInSecs(), TimeUnit.SECONDS); isInitialised = true; + LOG.debug("Metrics Sink Initialised with frequency {} ", getFrequencyInSecs()); } } @@ -102,10 +104,12 @@ public synchronized void tearDown() { public void run() { ReplicationMetricList metricList = new ReplicationMetricList(); try { + LOG.debug("Updating metrics to DB"); // get metrics LinkedList metrics = collector.getMetrics(); //Move metrics to thrift list if (metrics.size() > 0) { + LOG.debug("Converting metrics to thrift metrics {} ", metrics.size()); int totalMetricsSize = metrics.size(); List replicationMetricsList = new ArrayList<>(totalMetricsSize); for (int index = 0; index < totalMetricsSize; index++) { @@ -120,25 +124,24 @@ public void run() { replicationMetricsList.add(persistentMetric); } metricList.setReplicationMetricList(replicationMetricsList); + // write metrics and retry if fails + Retry retriable = new Retry(Exception.class) { + @Override + public Void execute() throws Exception { + //write + if (metricList.getReplicationMetricListSize() > 0) { + LOG.debug("Persisting metrics to DB {} ", metricList.getReplicationMetricListSize()); + Hive.get(conf).getMSC().addReplicationMetrics(metricList); + } + return null; + } + }; + retriable.run(); + } else { + LOG.debug("No Metrics to Update "); } } catch (Exception e) { - throw new RuntimeException("Metrics are not getting persisted", e); - } - // write metrics and retry if fails - Retry retriable = new Retry(Exception.class) { - @Override - public Void execute() throws Exception { - //write - if (metricList.getReplicationMetricListSize() > 0) { - Hive.get(conf).getMSC().addReplicationMetrics(metricList); - } - return null; - } - }; - try { - retriable.run(); - } catch (Exception e) { - throw new RuntimeException("Metrics are not getting persisted to HMS", e); + LOG.error("Metrics are not getting persisted", e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java index f97332cbbe..61cc34881d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; @@ -34,6 +36,7 @@ * Abstract class for Replication Metric Collection. */ public abstract class ReplicationMetricCollector { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationMetricCollector.class); private ReplicationMetric replicationMetric; private MetricCollector metricCollector; private boolean isEnabled; @@ -53,7 +56,9 @@ public ReplicationMetricCollector(String dbName, Metadata.ReplicationType replic public void reportStageStart(String stageName, Map metricMap) throws SemanticException { if (isEnabled) { + LOG.debug("Stage Started {}, {}, {}", stageName, metricMap.size(), metricMap ); Progress progress = replicationMetric.getProgress(); + progress.setStatus(Status.IN_PROGRESS); Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis()); for (Map.Entry metric : metricMap.entrySet()) { stage.addMetric(new Metric(metric.getKey(), metric.getValue())); @@ -67,6 +72,7 @@ public void reportStageStart(String stageName, Map metricMap) thro public void reportStageEnd(String stageName, Status status, long lastReplId) throws SemanticException { if (isEnabled) { + LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId ); Progress progress = replicationMetric.getProgress(); Stage stage = progress.getStageByName(stageName); stage.setStatus(status); @@ -81,6 +87,7 @@ public void reportStageEnd(String stageName, Status status, long lastReplId) thr public void reportStageEnd(String stageName, Status status) throws SemanticException { if (isEnabled) { + LOG.debug("Stage Ended {}, {}", stageName, status ); Progress progress = replicationMetric.getProgress(); Stage stage = progress.getStageByName(stageName); stage.setStatus(status); @@ -92,6 +99,7 @@ public void reportStageEnd(String stageName, Status status) throws SemanticExcep public void reportStageProgress(String stageName, String metricName, long count) throws SemanticException { if (isEnabled) { + LOG.debug("Stage progress {}, {}, {}", stageName, metricName, count ); Progress progress = replicationMetric.getProgress(); Stage stage = progress.getStageByName(stageName); Metric metric = stage.getMetricByName(metricName); @@ -107,6 +115,7 @@ public void reportStageProgress(String stageName, String metricName, long count) public void reportEnd(Status status) throws SemanticException { if (isEnabled) { + LOG.info("End {}", status ); Progress progress = replicationMetric.getProgress(); progress.setStatus(status); replicationMetric.setProgress(progress); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java index d3ad8fb963..dfed9c2548 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress; @@ -127,6 +128,63 @@ public void testSuccessBootstrapDumpMetrics() throws Exception { actualMetric.setProgress(progress); checkSuccess(actualMetric, expectedMetric, "dump", Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name())); + + //Incremental + conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "2"); + ReplicationMetricCollector incrementDumpMetricCollector = new IncrementalDumpMetricCollector( + "testAcidTablesReplLoadBootstrapIncr_1592205875387", + "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295" + + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive", conf); + metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10); + incrementDumpMetricCollector.reportStageStart("dump", metricMap); + incrementDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.EVENTS.name(), 10); + incrementDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + incrementDumpMetricCollector.reportEnd(Status.SUCCESS); + + expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387", + Metadata.ReplicationType.INCREMENTAL, "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_" + + "parse_TestReplicationScenarios_245261428230295/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGlu" + + "Y3JfMTU5MjIwNTg3NTM4Nw==/0/hive"); + expectedMetadata.setLastReplId(10); + expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedEventsMetric = new Metric(ReplUtils.MetricName.EVENTS.name(), 10); + expectedEventsMetric.setCurrentCount(10); + dumpStage.addMetric(expectedEventsMetric); + expectedProgress.addStage(dumpStage); + expectedMetric = new ReplicationMetric(2, "repl", 0, + expectedMetadata); + expectedMetric.setProgress(expectedProgress); + Thread.sleep(1000 * 20); + metricsRequest = new GetReplicationMetricsRequest(); + metricsRequest.setPolicy("repl"); + actualReplicationMetrics = Hive.get(conf).getMSC().getReplicationMetrics(metricsRequest); + Assert.assertEquals(2, actualReplicationMetrics.getReplicationMetricListSize()); + actualThriftMetric = actualReplicationMetrics.getReplicationMetricList().get(0); + mapper = new ObjectMapper(); + actualMetric = new ReplicationMetric(actualThriftMetric.getScheduledExecutionId(), + actualThriftMetric.getPolicy(), actualThriftMetric.getDumpExecutionId(), + mapper.readValue(actualThriftMetric.getMetadata(), Metadata.class)); + progressMapper = mapper.readValue(actualThriftMetric.getProgress(), ProgressMapper.class); + progress = new Progress(); + progress.setStatus(progressMapper.getStatus()); + for (StageMapper stageMapper : progressMapper.getStages()) { + Stage stage = new Stage(); + stage.setName(stageMapper.getName()); + stage.setStatus(stageMapper.getStatus()); + stage.setStartTime(stageMapper.getStartTime()); + stage.setEndTime(stageMapper.getEndTime()); + for (Metric metric : stageMapper.getMetrics()) { + stage.addMetric(metric); + } + progress.addStage(stage); + } + actualMetric.setProgress(progress); + checkSuccessIncremental(actualMetric, expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.EVENTS.name())); } private void checkSuccess(ReplicationMetric actual, ReplicationMetric expected, String stageName, @@ -150,4 +208,25 @@ private void checkSuccess(ReplicationMetric actual, ReplicationMetric expected, } } + private void checkSuccessIncremental(ReplicationMetric actual, ReplicationMetric expected, String stageName, + List metricNames) { + Assert.assertEquals(expected.getDumpExecutionId(), actual.getDumpExecutionId()); + Assert.assertEquals(expected.getPolicy(), actual.getPolicy()); + Assert.assertEquals(expected.getScheduledExecutionId(), actual.getScheduledExecutionId()); + Assert.assertEquals(expected.getMetadata().getReplicationType(), actual.getMetadata().getReplicationType()); + Assert.assertEquals(expected.getMetadata().getDbName(), actual.getMetadata().getDbName()); + Assert.assertEquals(expected.getMetadata().getStagingDir(), actual.getMetadata().getStagingDir()); + Assert.assertEquals(expected.getMetadata().getLastReplId(), actual.getMetadata().getLastReplId()); + Assert.assertEquals(expected.getProgress().getStatus(), actual.getProgress().getStatus()); + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getStatus(), + actual.getProgress().getStageByName(stageName).getStatus()); + for (String metricName : metricNames) { + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount(), + actual.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount()); + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName) + .getCurrentCount(), actual.getProgress() + .getStageByName(stageName).getMetricByName(metricName).getCurrentCount()); + } + } + } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java index 4ba968f76b..1df99e09ae 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ReplicationMetricsMaintTask.java @@ -63,10 +63,11 @@ public void run() { if (!MetastoreConf.getBoolVar(conf, ConfVars.SCHEDULED_QUERIES_ENABLED)) { return; } + LOG.debug("Cleaning up older Metrics"); RawStore ms = HiveMetaStore.HMSHandler.getMSForConf(conf); int maxRetainSecs = (int) TimeUnit.DAYS.toSeconds(MetastoreConf.getTimeVar(conf, ConfVars.REPL_METRICS_MAX_AGE, TimeUnit.DAYS)); - int deleteCnt = ms.deleteScheduledExecutions(maxRetainSecs); + int deleteCnt = ms.deleteReplicationMetrics(maxRetainSecs); if (deleteCnt > 0L){ LOG.info("Number of deleted entries: " + deleteCnt); }