diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 641df005ed..d290f70eaa 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -69,6 +69,9 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.stats.StatsUtils; @@ -429,8 +432,17 @@ private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIn HiveConf confTemp = new HiveConf(); confTemp.set("hive.repl.enable.move.optimization", "true"); Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ReplicationMetricCollector metricCollector; + if (isIncrementalDump) { + metricCollector = new IncrementalLoadMetricCollector(replicadb, tuple.dumpLocation, "", + 0, 0, 1); + } else { + metricCollector = new BootstrapLoadMetricCollector(replicadb, tuple.dumpLocation, "", + 0, 0, 1); + } ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceDb, replicadb, - null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId)); + null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), + 0L, metricCollector); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext()); replLoadTask.executeTask(null); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java index a13d842183..1aa665da56 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java @@ -26,11 +26,20 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Before; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.junit.BeforeClass; import org.junit.Ignore; @@ -41,6 +50,8 @@ import java.util.Base64; import java.util.Map; import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; /** @@ -172,7 +183,12 @@ public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { // Bootstrap String withClause = " WITH('" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname - + "'='/replica_external_base')"; + + "'='/replica_external_base', '" + HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA + + "' = 'true' ,'" + HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA + "' = 'true' , '" + + HiveConf.ConfVars.HIVE_IN_TEST + "' = 'true'" + ",'"+ HiveConf.ConfVars.REPL_ATLAS_ENDPOINT + + "' = 'http://localhost:21000/atlas'" + ",'"+ HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB + "' = 'tgt'" + + ",'"+ HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME + "' = 'cluster0'" + + ",'"+ HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME + "' = 'cluster1')"; primary.run("use " + primaryDbName) .run("create external table t2 (id int)") .run("insert into t2 values(1)") @@ -183,7 +199,7 @@ public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next), true); primary.run("create scheduled query s1_t2 every 5 seconds as repl dump " + primaryDbName + withClause); replica.run("create scheduled query s2_t2 every 5 seconds as repl load " + primaryDbName + " INTO " - + replicatedDbName); + + replicatedDbName + withClause); Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name()))); FileSystem fs = FileSystem.get(dumpRoot.toUri(), primary.hiveConf); @@ -196,7 +212,20 @@ public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { .verifyResult("t2") .run("select id from t2 order by id") .verifyResults(new String[]{"1", "2"}); - + long lastReplId = Long.parseLong(primary.status(replicatedDbName).getOutput().get(0)); + DumpMetaData dumpMetaData = new DumpMetaData(ackPath.getParent(), primary.hiveConf); + List replicationMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(2, replicationMetrics.size()); + //Generate expected metrics + List expectedReplicationMetrics = new ArrayList<>(); + expectedReplicationMetrics.add(generateExpectedMetric("s1_t2", 0, primaryDbName, + Metadata.ReplicationType.BOOTSTRAP, ackPath.getParent().toString(), lastReplId, Status.SUCCESS, + generateDumpStages(true))); + expectedReplicationMetrics.add(generateExpectedMetric("s2_t2", + dumpMetaData.getDumpExecutionId(), replicatedDbName, + Metadata.ReplicationType.BOOTSTRAP, ackPath.getParent().toString(), lastReplId, Status.SUCCESS, + generateLoadStages(true))); + checkMetrics(expectedReplicationMetrics, replicationMetrics); // First incremental, after bootstrap primary.run("use " + primaryDbName) .run("insert into t2 values(3)") @@ -215,6 +244,130 @@ public void testExternalTablesReplLoadBootstrapIncr() throws Throwable { replica.run("drop scheduled query s2_t2"); } } + + private void checkMetrics(List expectedReplicationMetrics, + List actualMetrics) { + Assert.assertEquals(expectedReplicationMetrics.size(), actualMetrics.size()); + int metricCounter = 0; + for (ReplicationMetric actualMetric : actualMetrics) { + for (ReplicationMetric expecMetric : expectedReplicationMetrics) { + if (actualMetric.getPolicy().equalsIgnoreCase(expecMetric.getPolicy())) { + Assert.assertEquals(expecMetric.getDumpExecutionId(), actualMetric.getDumpExecutionId()); + Assert.assertEquals(expecMetric.getMetadata().getDbName(), actualMetric.getMetadata().getDbName()); + Assert.assertEquals(expecMetric.getMetadata().getLastReplId(), + actualMetric.getMetadata().getLastReplId()); + Assert.assertEquals(expecMetric.getMetadata().getStagingDir(), + actualMetric.getMetadata().getStagingDir()); + Assert.assertEquals(expecMetric.getMetadata().getReplicationType(), + actualMetric.getMetadata().getReplicationType()); + Assert.assertEquals(expecMetric.getProgress().getStatus(), actualMetric.getProgress().getStatus()); + Assert.assertEquals(expecMetric.getProgress().getStages().size(), + actualMetric.getProgress().getStages().size()); + List expectedStages = expecMetric.getProgress().getStages(); + List actualStages = actualMetric.getProgress().getStages(); + int counter = 0; + for (Stage actualStage : actualStages) { + for (Stage expeStage : expectedStages) { + if (actualStage.getName().equalsIgnoreCase(expeStage.getName())) { + Assert.assertEquals(expeStage.getStatus(), actualStage.getStatus()); + Assert.assertEquals(expeStage.getMetrics().size(), actualStage.getMetrics().size()); + for (Metric actMetric : actualStage.getMetrics()) { + for (Metric expMetric : expeStage.getMetrics()) { + if (actMetric.getName().equalsIgnoreCase(expMetric.getName())) { + Assert.assertEquals(expMetric.getTotalCount(), actMetric.getTotalCount()); + Assert.assertEquals(expMetric.getCurrentCount(), actMetric.getCurrentCount()); + } + } + } + counter++; + if (counter == actualStages.size()) { + break; + } + } + } + } + metricCounter++; + if (metricCounter == actualMetrics.size()) { + break; + } + } + } + } + } + + private List generateLoadStages(boolean isBootstrap) { + List stages = new ArrayList<>(); + //Ranger + Stage rangerDump = new Stage("RANGER_LOAD", Status.SUCCESS, 0); + Metric rangerMetric = new Metric(ReplUtils.MetricName.POLICIES.name(), 0); + rangerDump.addMetric(rangerMetric); + stages.add(rangerDump); + //Atlas + Stage atlasDump = new Stage("ATLAS_LOAD", Status.SUCCESS, 0); + Metric atlasMetric = new Metric(ReplUtils.MetricName.TAGS.name(), 0); + atlasDump.addMetric(atlasMetric); + stages.add(atlasDump); + //Hive + Stage replDump = new Stage("REPL_LOAD", Status.SUCCESS, 0); + if (isBootstrap) { + Metric hiveMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 1); + hiveMetric.setCurrentCount(1); + replDump.addMetric(hiveMetric); + hiveMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 0); + replDump.addMetric(hiveMetric); + } else { + Metric hiveMetric = new Metric(ReplUtils.MetricName.EVENTS.name(), 1); + hiveMetric.setCurrentCount(1); + replDump.addMetric(hiveMetric); + } + stages.add(replDump); + return stages; + } + + private List generateDumpStages(boolean isBootstrap) { + List stages = new ArrayList<>(); + //Ranger + Stage rangerDump = new Stage("RANGER_DUMP", Status.SUCCESS, 0); + Metric rangerMetric = new Metric(ReplUtils.MetricName.POLICIES.name(), 0); + rangerDump.addMetric(rangerMetric); + stages.add(rangerDump); + //Atlas + Stage atlasDump = new Stage("ATLAS_DUMP", Status.SUCCESS, 0); + Metric atlasMetric = new Metric(ReplUtils.MetricName.TAGS.name(), 0); + atlasDump.addMetric(atlasMetric); + stages.add(atlasDump); + //Hive + Stage replDump = new Stage("REPL_DUMP", Status.SUCCESS, 0); + if (isBootstrap) { + Metric hiveMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 1); + hiveMetric.setCurrentCount(1); + replDump.addMetric(hiveMetric); + hiveMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 0); + replDump.addMetric(hiveMetric); + } else { + Metric hiveMetric = new Metric(ReplUtils.MetricName.EVENTS.name(), 1); + hiveMetric.setCurrentCount(1); + replDump.addMetric(hiveMetric); + } + stages.add(replDump); + return stages; + } + + private ReplicationMetric generateExpectedMetric(String policy, long dumpExecId, String dbName, + Metadata.ReplicationType replicationType, String staging, + long lastReplId, Status status, List stages) { + Metadata metadata = new Metadata(dbName, replicationType, staging); + metadata.setLastReplId(lastReplId); + ReplicationMetric replicationMetric = new ReplicationMetric(0, policy, dumpExecId, metadata); + Progress progress = new Progress(); + progress.setStatus(status); + for (Stage stage : stages) { + progress.addStage(stage); + } + replicationMetric.setProgress(progress); + return replicationMetric; + } + private void waitForAck(FileSystem fs, Path ackFile, long timeout) throws IOException { long oldTime = System.currentTimeMillis(); long sleepInterval = 2; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index be48f99c59..9f6ccb5533 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.log.AtlasDumpLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,11 +48,12 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.Set; +import java.util.List; +import java.util.Arrays; +import java.util.ArrayList; /** * Atlas Metadata Replication Dump Task. @@ -71,6 +73,9 @@ public int execute() { AtlasDumpLogger replLogger = new AtlasDumpLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getStagingDir().toString()); replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TAGS.name(), 0L); + work.getMetricCollector().reportStageStart(getName(), metricMap); atlasRestClient = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint()) .getClient(atlasReplInfo.getConf()); AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder(); @@ -81,10 +86,16 @@ public int execute() { LOG.debug("Finished dumping atlas metadata, total:{} bytes written", numBytesWritten); createDumpMetadata(atlasReplInfo, currentModifiedTime); replLogger.endLog(0L); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS); return 0; } catch (Exception e) { LOG.error("Exception while dumping atlas metadata", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java index 3344152f43..3f10730be4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -34,13 +35,16 @@ private final Path stagingDir; private final boolean bootstrap; private final Path prevAtlasDumpDir; + private final transient ReplicationMetricCollector metricCollector; - public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir) { + public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, + ReplicationMetricCollector metricCollector) { this.srcDB = srcDB; this.stagingDir = stagingDir; this.bootstrap = bootstrap; this.prevAtlasDumpDir = prevAtlasDumpDir; + this.metricCollector = metricCollector; } public boolean isBootstrap() { @@ -58,4 +62,8 @@ public String getSrcDB() { public Path getStagingDir() { return stagingDir; } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index fa18bf3236..487afc4eef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -31,8 +31,8 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; - import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +44,8 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; /** * Atlas Metadata Replication Load Task. @@ -56,6 +58,9 @@ public int execute() { try { AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TAGS.name(), 0L); + work.getMetricCollector().reportStageStart(getName(), metricMap); LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}", atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); AtlasLoadLogger replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), @@ -63,11 +68,18 @@ public int execute() { replLogger.startLog(); int importCount = importAtlasMetadata(atlasReplInfo); replLogger.endLog(importCount); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TAGS.name(), importCount); LOG.info("Atlas entities import count {}", importCount); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS); return 0; } catch (Exception e) { LOG.error("Exception while loading atlas metadata", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java index 4dc1ea81a6..817c214675 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -33,11 +34,13 @@ private final String srcDB; private final String tgtDB; private final Path stagingDir; + private final transient ReplicationMetricCollector metricCollector; - public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir) { + public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir, ReplicationMetricCollector metricCollector) { this.srcDB = srcDB; this.tgtDB = tgtDB; this.stagingDir = stagingDir; + this.metricCollector = metricCollector; } public static long getSerialVersionUID() { @@ -55,4 +58,8 @@ public String getTgtDB() { public Path getStagingDir() { return stagingDir; } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java index 5a56a6be95..92ca6ea6ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -33,13 +33,16 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.net.URL; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * RangerDumpTask. @@ -77,6 +80,11 @@ public int execute() { long exportCount = 0; Path filePath = null; LOG.info("Exporting Ranger Metadata"); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.POLICIES.name(), 0L); + work.getMetricCollector().reportStageStart(getName(), metricMap); + replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString()); + replLogger.startLog(); if (rangerRestClient == null) { rangerRestClient = getRangerRestClient(); } @@ -91,8 +99,6 @@ public int execute() { if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) { throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint); } - replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString()); - replLogger.startLog(); RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint, work.getDbName(), rangerHiveServiceName); List rangerPolicies = rangerExportPolicyList.getPolicies(); @@ -109,15 +115,22 @@ public int execute() { if (filePath != null) { LOG.info("Ranger policy export finished successfully"); exportCount = rangerExportPolicyList.getListSize(); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.POLICIES.name(), exportCount); } } replLogger.endLog(exportCount); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS); LOG.debug("Ranger policy export filePath:" + filePath); LOG.info("Number of ranger policies exported {}", exportCount); return 0; } catch (Exception e) { LOG.error("failed", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java index 026402b43e..b1393b20d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java @@ -19,6 +19,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import java.io.Serializable; @@ -36,10 +37,12 @@ private static final long serialVersionUID = 1L; private Path currentDumpPath; private String dbName; + private final transient ReplicationMetricCollector metricCollector; - public RangerDumpWork(Path currentDumpPath, String dbName) { + public RangerDumpWork(Path currentDumpPath, String dbName, ReplicationMetricCollector metricCollector) { this.currentDumpPath = currentDumpPath; this.dbName = dbName; + this.metricCollector = metricCollector; } public Path getCurrentDumpPath() { @@ -53,4 +56,8 @@ public String getDbName() { URL getRangerConfigResource() { return getClass().getClassLoader().getResource(ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME); } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java index 4e8a44fdae..fa57efd2fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.load.log.RangerLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,9 @@ import java.io.Serializable; import java.net.URL; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_RANGER_ADD_DENY_POLICY_TARGET; /** @@ -101,6 +104,9 @@ public int execute() { replLogger = new RangerLoadLogger(work.getSourceDbName(), work.getTargetDbName(), work.getCurrentDumpPath().toString(), expectedPolicyCount); replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.POLICIES.name(), (long) expectedPolicyCount); + work.getMetricCollector().reportStageStart(getName(), metricMap); if (rangerExportPolicyList != null && !CollectionUtils.isEmpty(rangerExportPolicyList.getPolicies())) { rangerPolicies = rangerExportPolicyList.getPolicies(); } @@ -129,13 +135,20 @@ public int execute() { rangerHiveServiceName); LOG.info("Number of ranger policies imported {}", rangerExportPolicyList.getListSize()); importCount = rangerExportPolicyList.getListSize(); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.POLICIES.name(), importCount); replLogger.endLog(importCount); LOG.info("Ranger policy import finished {} ", importCount); } + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS); return 0; } catch (Exception e) { LOG.error("Failed", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java index cddca6076a..f42575b85d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java @@ -19,6 +19,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,11 +41,14 @@ private Path currentDumpPath; private String targetDbName; private String sourceDbName; + private final transient ReplicationMetricCollector metricCollector; - public RangerLoadWork(Path currentDumpPath, String sourceDbName, String targetDbName) { + public RangerLoadWork(Path currentDumpPath, String sourceDbName, String targetDbName, + ReplicationMetricCollector metricCollector) { this.currentDumpPath = currentDumpPath; this.targetDbName = targetDbName; this.sourceDbName = sourceDbName; + this.metricCollector = metricCollector; } public Path getCurrentDumpPath() { @@ -62,4 +66,8 @@ public String getSourceDbName() { URL getRangerConfigResource() { return getClass().getClassLoader().getResource(ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME); } + + ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 046b6a00de..62d8921714 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.ReplChangeManager; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; @@ -71,7 +73,11 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.thrift.TException; @@ -97,6 +103,8 @@ import java.util.LinkedList; import java.util.UUID; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; @@ -147,6 +155,7 @@ public int execute() { Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); work.setCurrentDumpPath(currentDumpPath); + work.setMetricCollector(initMetricCollection(isBootstrap, hiveDumpRoot)); if (shouldDumpAtlasMetadata()) { addAtlasDumpTask(isBootstrap, previousValidHiveDumpPath); LOG.info("Added task to dump atlas metadata."); @@ -174,6 +183,11 @@ public int execute() { } catch (Exception e) { LOG.error("failed", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } return 0; @@ -183,7 +197,8 @@ private void initiateAuthorizationDumpTask() throws SemanticException { if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { Path rangerDumpRoot = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_RANGER_BASE_DIR); LOG.info("Exporting Authorization Metadata from {} at {} ", RANGER_AUTHORIZER, rangerDumpRoot); - RangerDumpWork rangerDumpWork = new RangerDumpWork(rangerDumpRoot, work.dbNameOrPattern); + RangerDumpWork rangerDumpWork = new RangerDumpWork(rangerDumpRoot, work.dbNameOrPattern, + work.getMetricCollector()); Task rangerDumpTask = TaskFactory.get(rangerDumpWork, conf); if (childTasks == null) { childTasks = new ArrayList<>(); @@ -240,7 +255,8 @@ private void addAtlasDumpTask(boolean bootstrap, Path prevHiveDumpDir) { Path atlasDumpDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_ATLAS_BASE_DIR); Path prevAtlasDumpDir = prevHiveDumpDir == null ? null : new Path(prevHiveDumpDir.getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); - AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir); + AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir, + work.getMetricCollector()); Task atlasDumpTask = TaskFactory.get(atlasDumpWork, conf); childTasks = new ArrayList<>(); childTasks.add(atlasDumpTask); @@ -253,6 +269,7 @@ private void finishRemainingTasks() throws SemanticException { + ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); Utils.create(dumpAckFile, conf); prepareReturnValues(work.getResultValues()); + work.getMetricCollector().reportEnd(Status.SUCCESS); deleteAllPreviousDumpMeta(work.getCurrentDumpPath()); } @@ -449,9 +466,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive long bootDumpBeginReplId = -1; List managedTableCopyPaths = Collections.emptyList(); List extTableCopyWorks = Collections.emptyList(); - List tableList = work.replScope.includeAllTables() ? null : new ArrayList<>(); - // If we are bootstrapping ACID tables, we need to perform steps similar to a regular // bootstrap (See bootstrapDump() for more details. Only difference here is instead of // waiting for the concurrent transactions to finish, we start dumping the incremental events @@ -465,29 +480,21 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); waitUntilTime = System.currentTimeMillis() + timeoutInMs; } - // TODO : instead of simply restricting by message format, we should eventually // move to a jdbc-driver-stype registering of message format, and picking message // factory per event to decode. For now, however, since all messages have the // same factory, restricting by message format is effectively a guard against // older leftover data that would cause us problems. - work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId); - IMetaStoreClient.NotificationFilter evFilter = new AndFilter( new ReplEventFilter(work.replScope), new EventBoundaryFilter(work.eventFrom, work.eventTo)); - EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(hiveDb); - - int maxEventLimit = getMaxEventAllowed(work.maxEventLimit()); EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( evFetcher, work.eventFrom, maxEventLimit, evFilter); - lastReplId = work.eventTo; - Path ackFile = new Path(dumpRoot, ReplAck.EVENTS_DUMP.toString()); long resumeFrom = Utils.fileExists(ackFile, conf) ? getResumeFrom(ackFile) : work.eventFrom; @@ -499,10 +506,14 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; - replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), - evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, maxEventLimit), + long estimatedNumEvents = evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, + maxEventLimit); + replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), estimatedNumEvents, work.eventFrom, work.eventTo, maxEventLimit); replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.EVENTS.name(), estimatedNumEvents); + work.getMetricCollector().reportStageStart(getName(), metricMap); long dumpedCount = resumeFrom - work.eventFrom; if (dumpedCount > 0) { LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); @@ -518,19 +529,16 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); Utils.writeOutput(String.valueOf(lastReplId), ackFile, conf); } - replLogger.endLog(lastReplId.toString()); - LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); - dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot); - + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executorId); // If repl policy is changed (oldReplScope is set), then pass the current replication policy, // so that REPL LOAD would drop the tables which are not included in current policy. if (work.oldReplScope != null) { dmd.setReplScope(work.replScope); } dmd.write(true); - // Examine all the tables if required. if (shouldExamineTablesToDump() || (tableList != null)) { // If required wait more for any transactions open at the time of starting the ACID bootstrap. @@ -538,7 +546,6 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive assert (waitUntilTime > 0); validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime); } - /* When same dump dir is resumed because of check-pointing, we need to clear the existing metadata. We need to rewrite the metadata as the write id list will be changed. We can't reuse the previous write id as it might be invalid due to compaction. */ @@ -587,9 +594,26 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); return lastReplId; } + private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) { + ReplicationMetricCollector collector; + String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME); + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + long maxCacheSize = conf.getLong(MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getVarname(), + (long) MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getDefaultVal()); + if (isBootstrap) { + collector = new BootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), policy, executorId, + maxCacheSize); + } else { + collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), policy, executorId, + maxCacheSize); + } + return collector; + } + private int getMaxEventAllowed(int currentEventMaxLimit) { int maxDirItems = Integer.parseInt(conf.get(ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG, "0")); if (maxDirItems > 0) { @@ -603,7 +627,6 @@ private int getMaxEventAllowed(int currentEventMaxLimit) { } return currentEventMaxLimit; } - private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws IOException { Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1)); Retry retriable = new Retry(IOException.class) { @@ -674,6 +697,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cm ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1); replLogger.eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString()); } @@ -779,10 +803,16 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throw new HiveException("Replication dump not allowed for replicated database" + " with first incremental dump pending : " + dbName); } + int estimatedNumTables = Utils.getAllTables(hiveDb, dbName, work.replScope).size(); + int estimatedNumFunctions = hiveDb.getAllFunctions().size(); replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), - Utils.getAllTables(hiveDb, dbName, work.replScope).size(), - hiveDb.getAllFunctions().size()); + estimatedNumTables, + estimatedNumFunctions); replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) estimatedNumTables); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) estimatedNumFunctions); + work.getMetricCollector().reportStageStart(getName(), metricMap); Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb); Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName); dumpFunctionMetadata(dbName, dbRoot, hiveDb); @@ -841,11 +871,13 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) Long bootDumpEndReplId = currentNotificationId(hiveDb); LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); - dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); dmd.write(true); work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId); return bootDumpBeginReplId; } @@ -912,7 +944,9 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); List managedTableCopyPaths = new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(false); + exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, + conf, mmCtx).write(false); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TABLES.name(), 1); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) || Utils.shouldDumpMetaDataOnly(conf)) { @@ -1042,6 +1076,7 @@ void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throw FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); serializer.writeTo(jsonWriter, tuple.replicationSpec); } + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.FUNCTIONS.name(), 1); replLogger.functionLog(functionName); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 86f92338a6..59cae6b9fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ private Path currentDumpPath; private List resultValues; private boolean shouldOverwrite; + private transient ReplicationMetricCollector metricCollector; public static void injectNextDumpDirForTest(String dumpDir) { injectNextDumpDirForTest(dumpDir, false); @@ -190,4 +192,12 @@ public void setResultValues(List resultValues) { public void setShouldOverwrite(boolean shouldOverwrite) { this.shouldOverwrite = shouldOverwrite; } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } + + public void setMetricCollector(ReplicationMetricCollector metricCollector) { + this.metricCollector = metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 792e331884..5a08aef34c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -19,12 +19,15 @@ import com.google.common.collect.Collections2; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplScope; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc; @@ -46,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; +import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -54,12 +58,13 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.HiveTableName; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.*; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.IOException; @@ -120,10 +125,20 @@ public int execute() { } } catch (RuntimeException e) { LOG.error("replication failed with run time exception", e); + try { + work.getMetricCollector().reportEnd(Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } throw e; } catch (Exception e) { LOG.error("replication failed", e); setException(e); + try { + work.getMetricCollector().reportEnd(Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } @@ -136,7 +151,8 @@ private void initiateAuthorizationLoadTask() throws SemanticException { if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { Path rangerLoadRoot = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_RANGER_BASE_DIR); LOG.info("Adding Import Ranger Metadata Task from {} ", rangerLoadRoot); - RangerLoadWork rangerLoadWork = new RangerLoadWork(rangerLoadRoot, work.getSourceDbName(), work.dbNameToLoadIn); + RangerLoadWork rangerLoadWork = new RangerLoadWork(rangerLoadRoot, work.getSourceDbName(), work.dbNameToLoadIn, + work.getMetricCollector()); Task rangerLoadTask = TaskFactory.get(rangerLoadWork, conf); if (childTasks == null) { childTasks = new ArrayList<>(); @@ -151,7 +167,8 @@ private void initiateAuthorizationLoadTask() throws SemanticException { private void addAtlasLoadTask() throws HiveException { Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir); - AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir); + AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir, + work.getMetricCollector()); Task atlasLoadTask = TaskFactory.get(atlasLoadWork, conf); if (childTasks == null) { childTasks = new ArrayList<>(); @@ -228,7 +245,7 @@ a database ( directory ) tableTracker.addTask(createViewTask(tableEvent.getMetaData(), work.dbNameToLoadIn, conf)); } else { LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext, - loadTaskTracker); + loadTaskTracker, work.getMetricCollector()); tableTracker = loadTable.tasks(work.isIncrementalLoad()); } @@ -254,7 +271,7 @@ a database ( directory ) // for a table we explicitly try to load partitions as there is no separate partitions events. LoadPartitions loadPartitions = new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent, - work.dbNameToLoadIn, tableContext); + work.dbNameToLoadIn, tableContext, work.getMetricCollector()); TaskTracker partitionsTracker = loadPartitions.tasks(); partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, partitionsTracker); @@ -321,7 +338,7 @@ private TaskTracker addLoadPartitionTasks(Context loadContext, BootstrapEvent ne TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); LoadPartitions loadPartitions = new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker, - event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); + event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector()); /* the tableTracker here should be a new instance and not an existing one as this can only happen when we break in between loading partitions. @@ -348,7 +365,7 @@ private TaskTracker addLoadConstraintsTasks(Context loadContext, private TaskTracker addLoadFunctionTasks(Context loadContext, BootstrapEventsIterator iterator, BootstrapEvent next, TaskTracker dbTracker, Scope scope) throws IOException, SemanticException { LoadFunction loadFunction = new LoadFunction(loadContext, iterator.replLogger(), - (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); + (FunctionEvent) next, work.dbNameToLoadIn, dbTracker, work.getMetricCollector()); TaskTracker functionsTracker = loadFunction.tasks(); if (!scope.database) { scope.rootTasks.addAll(functionsTracker.tasks()); @@ -442,7 +459,7 @@ private void createEndReplLogTask(Context context, Scope scope, Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); dbProps = dbInMetadata.getParameters(); } - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps, work.getMetricCollector()); Task replLogTask = TaskFactory.get(replLogWork, conf); if (scope.rootTasks.isEmpty()) { scope.rootTasks.add(replLogTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 26cd59b082..43bf365b4f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; @@ -45,6 +47,8 @@ final String dumpDirectory; private boolean lastReplIDUpdated; private String sourceDbName; + private Long dumpExecutionId; + private final transient ReplicationMetricCollector metricCollector; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -62,12 +66,17 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String sourceDbName, String dbNameToLoadIn, ReplScope currentReplScope, - LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException { + LineageState lineageState, boolean isIncrementalDump, Long eventTo, + Long dumpExecutionId, + ReplicationMetricCollector metricCollector) throws IOException, SemanticException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; this.currentReplScope = currentReplScope; this.sourceDbName = sourceDbName; + this.dumpExecutionId = dumpExecutionId; + this.metricCollector = metricCollector; + // If DB name is changed during REPL LOAD, then set it instead of referring to source DB name. if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) { @@ -77,7 +86,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, rootTask = null; if (isIncrementalDump) { incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory, - new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo); + new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo, metricCollector); /* * If the current incremental dump also includes bootstrap for some tables, then create iterator @@ -87,7 +96,8 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, FileSystem fs = incBootstrapDir.getFileSystem(hiveConf); if (fs.exists(incBootstrapDir)) { this.bootstrapIterator = new BootstrapEventsIterator( - new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true, hiveConf); + new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true, + hiveConf, metricCollector); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); } else { this.bootstrapIterator = null; @@ -95,7 +105,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, } } else { this.bootstrapIterator = new BootstrapEventsIterator(new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME) - .toString(), dbNameToLoadIn, true, hiveConf); + .toString(), dbNameToLoadIn, true, hiveConf, metricCollector); this.constraintsIterator = new ConstraintEventsIterator( new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME).toString(), hiveConf); incrementalLoadTasksBuilder = null; @@ -158,4 +168,12 @@ public void setLastReplIDUpdated(boolean lastReplIDUpdated) { public String getSourceDbName() { return sourceDbName; } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } + + public Long getDumpExecutionId() { + return dumpExecutionId; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java index 7ade7c07d7..240f5a7db6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.Serializable; @@ -34,7 +36,13 @@ @Override public int execute() { - work.replStateLog(); + try { + work.replStateLog(); + } catch (SemanticException e) { + LOG.error("Exception while logging metrics ", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } return 0; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java index 37725d68c6..99fca5dd2e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java @@ -19,8 +19,12 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -43,6 +47,7 @@ private TableType tableType; private String functionName; private String lastReplId; + private final transient ReplicationMetricCollector metricCollector; private enum LOG_TYPE { TABLE, @@ -51,50 +56,58 @@ END } - public ReplStateLogWork(ReplLogger replLogger, String eventId, String eventType) { + public ReplStateLogWork(ReplLogger replLogger, ReplicationMetricCollector metricCollector, + String eventId, String eventType) { this.logType = LOG_TYPE.EVENT; this.replLogger = replLogger; this.eventId = eventId; this.eventType = eventType; + this.metricCollector = metricCollector; } - public ReplStateLogWork(ReplLogger replLogger, String tableName, TableType tableType) { + public ReplStateLogWork(ReplLogger replLogger, ReplicationMetricCollector metricCollector, + String tableName, TableType tableType) { this.logType = LOG_TYPE.TABLE; this.replLogger = replLogger; this.tableName = tableName; this.tableType = tableType; + this.metricCollector = metricCollector; } - public ReplStateLogWork(ReplLogger replLogger, String functionName) { + public ReplStateLogWork(ReplLogger replLogger, String functionName, ReplicationMetricCollector metricCollector) { this.logType = LOG_TYPE.FUNCTION; this.replLogger = replLogger; this.functionName = functionName; + this.metricCollector = metricCollector; } - public ReplStateLogWork(ReplLogger replLogger, Map dbProps) { + public ReplStateLogWork(ReplLogger replLogger, Map dbProps, + ReplicationMetricCollector metricCollector) { this.logType = LOG_TYPE.END; this.replLogger = replLogger; this.lastReplId = ReplicationSpec.getLastReplicatedStateFromParameters(dbProps); + this.metricCollector = metricCollector; } - public void replStateLog() { + public void replStateLog() throws SemanticException { switch (logType) { - case TABLE: { - replLogger.tableLog(tableName, tableType); - break; - } - case FUNCTION: { - replLogger.functionLog(functionName); - break; - } - case EVENT: { - replLogger.eventLog(eventId, eventType); - break; - } - case END: { - replLogger.endLog(lastReplId); - break; - } + case TABLE: + replLogger.tableLog(tableName, tableType); + metricCollector.reportStageProgress("REPL_LOAD", ReplUtils.MetricName.TABLES.name(), 1); + break; + case FUNCTION: + replLogger.functionLog(functionName); + metricCollector.reportStageProgress("REPL_LOAD", ReplUtils.MetricName.FUNCTIONS.name(), 1); + break; + case EVENT: + replLogger.eventLog(eventId, eventType); + metricCollector.reportStageProgress("REPL_LOAD", ReplUtils.MetricName.EVENTS.name(), 1); + break; + case END: + replLogger.endLog(lastReplId); + metricCollector.reportStageEnd("REPL_LOAD", Status.SUCCESS, Long.parseLong(lastReplId)); + metricCollector.reportEnd(Status.SUCCESS); + break; } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 5bbe20c8c6..99355ec4e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -23,13 +23,17 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -76,9 +80,12 @@ private final HiveConf hiveConf; private final boolean needLogger; private ReplLogger replLogger; + private final transient ReplicationMetricCollector metricCollector; - public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, boolean needLogger, HiveConf hiveConf) + public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, boolean needLogger, HiveConf hiveConf, + ReplicationMetricCollector metricCollector) throws IOException { + this.metricCollector = metricCollector; Path path = new Path(dumpDirectory); FileSystem fileSystem = path.getFileSystem(hiveConf); if (!fileSystem.exists(path)) { @@ -123,6 +130,7 @@ public boolean hasNext() { if (needLogger) { initReplLogger(); } + initMetricCollector(); } else { return false; } @@ -161,17 +169,16 @@ public ReplLogger replLogger() { return replLogger; } + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } + private void initReplLogger() { try { Path dbDumpPath = currentDatabaseIterator.dbLevelPath(); FileSystem fs = dbDumpPath.getFileSystem(hiveConf); - - long numTables = getSubDirs(fs, dbDumpPath).length; - long numFunctions = 0; - Path funcPath = new Path(dbDumpPath, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); - if (fs.exists(funcPath)) { - numFunctions = getSubDirs(fs, funcPath).length; - } + long numTables = getNumTables(dbDumpPath, fs); + long numFunctions = getNumFunctions(dbDumpPath, fs); String dbName = StringUtils.isBlank(dbNameToLoadIn) ? dbDumpPath.getName() : dbNameToLoadIn; replLogger = new BootstrapLoadLogger(dbName, dumpDirectory, numTables, numFunctions); replLogger.startLog(); @@ -180,6 +187,35 @@ private void initReplLogger() { } } + private long getNumFunctions(Path dbDumpPath, FileSystem fs) throws IOException { + Path funcPath = new Path(dbDumpPath, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); + if (fs.exists(funcPath)) { + return getSubDirs(fs, funcPath).length; + } + return 0; + } + + private long getNumTables(Path dbDumpPath, FileSystem fs) throws IOException { + return getSubDirs(fs, dbDumpPath).length; + } + + private void initMetricCollector() { + try { + Path dbDumpPath = currentDatabaseIterator.dbLevelPath(); + FileSystem fs = dbDumpPath.getFileSystem(hiveConf); + long numTables = getNumTables(dbDumpPath, fs); + long numFunctions = getNumFunctions(dbDumpPath, fs); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) numTables); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) numFunctions); + metricCollector.reportStageStart("REPL_LOAD", metricMap); + } catch (IOException e) { + // Ignore the exception + } catch (SemanticException e) { + throw new RuntimeException("Failed to collect Metrics ", e); + } + } + FileStatus[] getSubDirs(FileSystem fs, Path dirPath) throws IOException { return fs.listStatus(dirPath, new PathFilter() { @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index 8815eeebe1..667ec7ff31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,19 +56,21 @@ private final FunctionEvent event; private final String dbNameToLoadIn; private final TaskTracker tracker; + private final ReplicationMetricCollector metricCollector; public LoadFunction(Context context, ReplLogger replLogger, FunctionEvent event, - String dbNameToLoadIn, TaskTracker existingTracker) { + String dbNameToLoadIn, TaskTracker existingTracker, ReplicationMetricCollector metricCollector) { this.context = context; this.replLogger = replLogger; this.event = event; this.dbNameToLoadIn = dbNameToLoadIn; this.tracker = new TaskTracker(existingTracker); + this.metricCollector = metricCollector; } private void createFunctionReplLogTask(List> functionTasks, String functionName) { - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, functionName); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, functionName, metricCollector); Task replLogTask = TaskFactory.get(replLogWork, context.hiveConf); DAGTraversal.traverse(functionTasks, new AddDependencyToLeaves(replLogTask)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index b36c4a531f..b78df44e84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; @@ -77,19 +78,22 @@ private final TableEvent event; private final TaskTracker tracker; private final AlterTableAddPartitionDesc lastReplicatedPartition; + private final ReplicationMetricCollector metricCollector; private final ImportTableDesc tableDesc; private Table table; public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker, TableEvent event, String dbNameToLoadIn, - TableContext tableContext) throws HiveException { - this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null); + TableContext tableContext, ReplicationMetricCollector metricCollector) throws HiveException { + this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, + metricCollector); } public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext, TaskTracker limiter, TableEvent event, String dbNameToLoadIn, - AlterTableAddPartitionDesc lastReplicatedPartition) throws HiveException { + AlterTableAddPartitionDesc lastReplicatedPartition, + ReplicationMetricCollector metricCollector) throws HiveException { this.tracker = new TaskTracker(limiter); this.event = event; this.context = context; @@ -99,6 +103,7 @@ public LoadPartitions(Context context, ReplLogger replLogger, TableContext table this.tableDesc = event.tableDesc(dbNameToLoadIn); this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); + this.metricCollector = metricCollector; } public TaskTracker tasks() throws Exception { @@ -118,7 +123,7 @@ public TaskTracker tasks() throws Exception { if (!forNewTable().hasReplicationState()) { // Add ReplStateLogTask only if no pending table load tasks left for next cycle Task replLogTask - = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf, metricCollector); tracker.addDependentTask(replLogTask); } return tracker; @@ -132,7 +137,7 @@ public TaskTracker tasks() throws Exception { if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) { // Add ReplStateLogTask only if no pending table load tasks left for next cycle Task replLogTask - = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf, metricCollector); tracker.addDependentTask(replLogTask); } return tracker; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 6cea22c01f..9e236fd697 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -73,14 +74,16 @@ private final TableContext tableContext; private final TaskTracker tracker; private final TableEvent event; + private final ReplicationMetricCollector metricCollector; public LoadTable(TableEvent event, Context context, ReplLogger replLogger, - TableContext tableContext, TaskTracker limiter) { + TableContext tableContext, TaskTracker limiter, ReplicationMetricCollector metricCollector) { this.event = event; this.context = context; this.replLogger = replLogger; this.tableContext = tableContext; this.tracker = new TaskTracker(limiter); + this.metricCollector = metricCollector; } public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception { @@ -151,7 +154,7 @@ public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception { ); if (!isPartitioned(tableDesc)) { Task replLogTask - = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf, metricCollector); ckptTask.addDependentTask(replLogTask); } tracker.addDependentTask(ckptTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 7e844d3164..e4d20d1e75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.slf4j.Logger; @@ -73,9 +75,12 @@ private final ReplLogger replLogger; private static long numIteration; private final Long eventTo; + private final ReplicationMetricCollector metricCollector; public IncrementalLoadTasksBuilder(String dbName, String loadPath, - IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo) { + IncrementalLoadEventsIterator iterator, HiveConf conf, + Long eventTo, + ReplicationMetricCollector metricCollector) { this.dbName = dbName; this.iterator = iterator; inputs = new HashSet<>(); @@ -85,7 +90,8 @@ public IncrementalLoadTasksBuilder(String dbName, String loadPath, replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); replLogger.startLog(); this.eventTo = eventTo; - numIteration = 0; + setNumIteration(0); + this.metricCollector = metricCollector; } public Task build(Context context, Hive hive, Logger log, @@ -96,7 +102,9 @@ public IncrementalLoadTasksBuilder(String dbName, String loadPath, this.log = log; numIteration++; this.log.debug("Iteration num " + numIteration); - + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) iterator.getNumEvents()); + this.metricCollector.reportStageStart("REPL_LOAD", metricMap); while (iterator.hasNext() && tracker.canAddMoreTasks()) { FileStatus dir = iterator.next(); String location = dir.getPath().toUri().toString(); @@ -135,7 +143,7 @@ public IncrementalLoadTasksBuilder(String dbName, String loadPath, List> evTasks = analyzeEventLoad(mhContext); if ((evTasks != null) && (!evTasks.isEmpty())) { - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, metricCollector, dir.getPath().getName(), eventDmd.getDumpType().toString()); Task barrierTask = TaskFactory.get(replStateLogWork, conf); @@ -157,7 +165,7 @@ public IncrementalLoadTasksBuilder(String dbName, String loadPath, Map dbProps = new HashMap<>(); dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps, metricCollector); Task barrierTask = TaskFactory.get(replStateLogWork, conf); taskChainTail.addDependentTask(barrierTask); this.log.debug("Added {}:{} as a precursor of barrier task {}:{}", @@ -364,6 +372,10 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa return tasks; } + private static void setNumIteration(int count) { + numIteration = count; + } + public Long eventTo() { return eventTo; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index c0aadb5aa2..4db5addeef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -134,6 +135,13 @@ LOAD_NEW, LOAD_SKIP, LOAD_REPLACE } + /** + * Replication Metrics + */ + public enum MetricName { + TABLES, FUNCTIONS, EVENTS, POLICIES, TAGS + } + public static Map> genPartSpecs( Table table, List> partitions) throws SemanticException { Map> partSpecs = new HashMap<>(); @@ -167,10 +175,12 @@ return partSpecs; } - public static Task getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf) + public static Task getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf, + ReplicationMetricCollector metricCollector) throws SemanticException { TableType tableType = tableDesc.isExternal() ? TableType.EXTERNAL_TABLE : tableDesc.tableType(); - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableType); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, metricCollector, + tableDesc.getTableName(), tableType); return TaskFactory.get(replLogWork, conf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 7959df2b2f..08cbf5edc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -24,10 +24,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.repl.ReplScope; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; @@ -41,6 +43,9 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.PlanUtils; import java.io.IOException; @@ -398,7 +403,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), sourceDbNameOrPattern, replScope.getDbName(), dmd.getReplScope(), - queryState.getLineageState(), evDump, dmd.getEventTo()); + queryState.getLineageState(), evDump, dmd.getEventTo(), dmd.getDumpExecutionId(), + initMetricCollection(!evDump, loadPath.toString(), replScope.getDbName(), + dmd.getDumpExecutionId())); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } else { LOG.warn("Previous Dump Already Loaded"); @@ -409,6 +416,23 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } } + private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, String dumpDirectory, + String dbNameToLoadIn, long dumpExecutionId) { + ReplicationMetricCollector collector; + String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME); + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + long maxCacheSize = conf.getLong(MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getVarname(), + (long) MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getDefaultVal()); + if (isBootstrap) { + collector = new BootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, policy, executorId, + dumpExecutionId, maxCacheSize); + } else { + collector = new IncrementalLoadMetricCollector(dbNameToLoadIn, dumpDirectory, policy, executorId, + dumpExecutionId, maxCacheSize); + } + return collector; + } + private Path getCurrentLoadPath() throws IOException, SemanticException { Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java new file mode 100644 index 0000000000..a5463ae875 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.metric; + +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * BootstrapDumpMetricCollector. + * Bootstrap Dump Metric Collector + */ +public class BootstrapDumpMetricCollector extends ReplicationMetricCollector { + public BootstrapDumpMetricCollector(String dbName, String stagingDir, String policy, + long executionId, long maxCacheSize) { + super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, policy, executionId, 0, maxCacheSize); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java new file mode 100644 index 0000000000..9038b299ac --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.metric; + +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * IncrementalDumpMetricCollector. + * Incremental Dump Metric Collector + */ +public class IncrementalDumpMetricCollector extends ReplicationMetricCollector { + public IncrementalDumpMetricCollector(String dbName, String stagingDir, String policy, + long executionId, long maxCacheSize) { + super(dbName, Metadata.ReplicationType.INCREMENTAL, stagingDir, policy, executionId, 0, + maxCacheSize); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index e538c79f34..dc40e1df9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -51,6 +51,7 @@ private boolean initialized = false; private final Path dumpFile; private final HiveConf hiveConf; + private Long dumpExecutionId; public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -60,15 +61,16 @@ public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { public DumpMetaData(Path dumpRoot, DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, HiveConf hiveConf) { this(dumpRoot, hiveConf); - setDump(lvl, eventFrom, eventTo, cmRoot); + setDump(lvl, eventFrom, eventTo, cmRoot, 0L); } - public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot) { + public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, Long dumpExecutionId) { this.dumpType = lvl; this.eventFrom = eventFrom; this.eventTo = eventTo; this.cmRoot = cmRoot; this.initialized = true; + this.dumpExecutionId = dumpExecutionId; } public void setPayload(String payload) { @@ -115,11 +117,11 @@ private void loadDumpFromFile() throws SemanticException { br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line; if ((line = br.readLine()) != null) { - String[] lineContents = line.split("\t", 5); + String[] lineContents = line.split("\t", 6); setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]), - new Path(lineContents[3])); - setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]); + new Path(lineContents[3]), Long.valueOf(lineContents[4])); + setPayload(lineContents[5].equals(Utilities.nullStringOutput) ? null : lineContents[5]); } else { throw new IOException( "Unable to read valid values from dumpFile:" + dumpFile.toUri().toString()); @@ -158,6 +160,11 @@ public Long getEventTo() throws SemanticException { return eventTo; } + public Long getDumpExecutionId() throws SemanticException { + initializeIfNot(); + return dumpExecutionId; + } + public ReplScope getReplScope() throws SemanticException { initializeIfNot(); return replScope; @@ -207,6 +214,7 @@ public void write(boolean replace) throws SemanticException { eventFrom.toString(), eventTo.toString(), cmRoot.toString(), + dumpExecutionId.toString(), payload) ); if (replScope != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/BootstrapLoadMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/BootstrapLoadMetricCollector.java new file mode 100644 index 0000000000..df60cff69e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/BootstrapLoadMetricCollector.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.metric; + +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * BootstrapLoadMetricCollector. + * Bootstrap Load Metric Collector + */ +public class BootstrapLoadMetricCollector extends ReplicationMetricCollector { + public BootstrapLoadMetricCollector(String dbName, String stagingDir, String policy, + long executionId, long dumpExecutionId, long maxCacheSize) { + super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, policy, executionId, dumpExecutionId, maxCacheSize); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/IncrementalLoadMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/IncrementalLoadMetricCollector.java new file mode 100644 index 0000000000..839b3b34ed --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/IncrementalLoadMetricCollector.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.metric; + +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * IncrementalLoadMetricCollector. + * Incremental Load Metric Collector + */ +public class IncrementalLoadMetricCollector extends ReplicationMetricCollector { + public IncrementalLoadMetricCollector(String dbName, String stagingDir, String policy, + long executionId, long dumpExecutionId, long maxCacheSize) { + super(dbName, Metadata.ReplicationType.INCREMENTAL, stagingDir, policy, executionId, dumpExecutionId, maxCacheSize); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricCollector.java new file mode 100644 index 0000000000..fc1a9b540a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricCollector.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.metric; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * MetricCollector. + * In memory collection of metrics + */ +public final class MetricCollector { + private static final Logger LOG = LoggerFactory.getLogger(MetricCollector.class); + private Map metricMap = new ConcurrentHashMap<>(); + private static long maxSize = (long) MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getDefaultVal(); + private static boolean isInited = false; + private static volatile MetricCollector instance; + + private MetricCollector(){ + } + + public static MetricCollector getInstance() { + if (instance == null) { + synchronized (MetricCollector.class) { + if (instance == null) { + instance = new MetricCollector(); + } + } + } + return instance; + } + + public synchronized MetricCollector init(long cacheSize) { + //Can initialize the cache only once with a value. + if (!isInited) { + maxSize = cacheSize; + isInited = true; + } else { + LOG.warn("Metric Collection cache is already initialised with size {} .", maxSize); + } + return instance; + } + + public synchronized void addMetric(ReplicationMetric replicationMetric) throws SemanticException { + if (metricMap.size() > maxSize) { + throw new SemanticException("Metrics are not getting collected. "); + } else { + if (metricMap.size() > 0.8 * maxSize) { //soft limit + LOG.warn("Metrics cache is more than 80 % full. Will start dropping metrics once full. "); + } + metricMap.put(replicationMetric.getScheduledExecutionId(), replicationMetric); + } + } + + public synchronized List getMetrics() { + List metricList = new ArrayList<>(metricMap.values()); + metricMap.clear(); + return metricList; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java new file mode 100644 index 0000000000..68a27d5832 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric; + +/** + * MetricSink. + * Scheduled thread to poll from Metric Collector and persists to DB + */ +public class MetricSink { +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java new file mode 100644 index 0000000000..cbae9c776d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric; + +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric; + +import java.util.Map; + +/** + * Abstract class for Replication Metric Collection. + */ +public abstract class ReplicationMetricCollector { + private ReplicationMetric replicationMetric; + private MetricCollector metricCollector; + private boolean isEnabled; + + public ReplicationMetricCollector(String dbName, Metadata.ReplicationType replicationType, + String stagingDir, String policy, long executionId, + long dumpExecutionId, long maxCacheSize) { + if (!StringUtils.isEmpty(policy) && executionId > 0) { + isEnabled = true; + metricCollector = MetricCollector.getInstance().init(maxCacheSize); + Metadata metadata = new Metadata(dbName, replicationType, stagingDir); + replicationMetric = new ReplicationMetric(executionId, policy, dumpExecutionId, metadata); + } + } + + public void reportStageStart(String stageName, Map metricMap) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis()); + for (Map.Entry metric : metricMap.entrySet()) { + stage.addMetric(new Metric(metric.getKey(), metric.getValue())); + } + progress.addStage(stage); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } + + + public void reportStageEnd(String stageName, Status status, long lastReplId) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + stage.setStatus(status); + stage.setEndTime(System.currentTimeMillis()); + progress.addStage(stage); + replicationMetric.setProgress(progress); + Metadata metadata = replicationMetric.getMetadata(); + metadata.setLastReplId(lastReplId); + replicationMetric.setMetadata(metadata); + metricCollector.addMetric(replicationMetric); + } + } + + public void reportStageEnd(String stageName, Status status) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + stage.setStatus(status); + stage.setEndTime(System.currentTimeMillis()); + progress.addStage(stage); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } + + public void reportStageProgress(String stageName, String metricName, long count) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + Metric metric = stage.getMetricByName(metricName); + metric.setCurrentCount(metric.getCurrentCount() + count); + if (metric.getCurrentCount() > metric.getTotalCount()) { + metric.setTotalCount(metric.getCurrentCount()); + } + stage.addMetric(metric); + progress.addStage(stage); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } + + public void reportEnd(Status status) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + progress.setStatus(status); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java new file mode 100644 index 0000000000..e707a6870e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +/** + * Class for defining the metadata info for replication metrics. + */ +public class Metadata { + /** + * Type of replication + */ + public enum ReplicationType { + BOOTSTRAP, + INCREMENTAL + } + private String dbName; + private ReplicationType replicationType; + private String stagingDir; + private long lastReplId; + + public Metadata(String dbName, ReplicationType replicationType, String stagingDir) { + this.dbName = dbName; + this.replicationType = replicationType; + this.stagingDir = stagingDir; + } + + public long getLastReplId() { + return lastReplId; + } + + public String getDbName() { + return dbName; + } + + public ReplicationType getReplicationType() { + return replicationType; + } + + public String getStagingDir() { + return stagingDir; + } + + public void setLastReplId(long lastReplId) { + this.lastReplId = lastReplId; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metric.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metric.java new file mode 100644 index 0000000000..fdd6fa403f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metric.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +/** + * Class for defining the unit metric. + */ +public class Metric { + private String name; + private long currentCount; + private long totalCount; + + public Metric(String name, long totalCount) { + this.name = name; + this.totalCount = totalCount; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getCurrentCount() { + return currentCount; + } + + public void setCurrentCount(long currentCount) { + this.currentCount = currentCount; + } + + public long getTotalCount() { + return totalCount; + } + + public void setTotalCount(long totalCount) { + this.totalCount = totalCount; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Progress.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Progress.java new file mode 100644 index 0000000000..32018198f1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Progress.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Class for defining the progress info for replication metrics. + */ +public class Progress { + + private Status status; + + private Map stages = new ConcurrentHashMap<>(); + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public void addStage(Stage stage) { + stages.put(stage.getName(), stage); + } + + public Stage getStageByName(String stageName) { + return stages.get(stageName); + } + + public List getStages() { + return new ArrayList<>(stages.values()); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java new file mode 100644 index 0000000000..cc833613be --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +/** + * Class for defining the replication metrics. + */ +public class ReplicationMetric { + private long scheduledExecutionId; + private String policy; + private long dumpExecutionId; + private Metadata metadata; + private Progress progress; + + public ReplicationMetric(long scheduledExecutionId, String policy, long dumpExecutionId, Metadata metadata){ + this.scheduledExecutionId = scheduledExecutionId; + this.policy = policy; + this.dumpExecutionId = dumpExecutionId; + this.metadata = metadata; + this.progress = new Progress(); + } + + public long getScheduledExecutionId() { + return scheduledExecutionId; + } + + + public String getPolicy() { + return policy; + } + + public long getDumpExecutionId() { + return dumpExecutionId; + } + + public Progress getProgress() { + return progress; + } + + public void setMetadata(Metadata metadata) { + this.metadata = metadata; + } + + public Metadata getMetadata() { + return metadata; + } + + public void setProgress(Progress progress) { + this.progress = progress; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java new file mode 100644 index 0000000000..5e87cc2f5b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Class for defining the different stages of replication. + */ +public class Stage { + private String name; + private Status status; + private long startTime; + private long endTime; + private Map metrics = new HashMap<>(); + + public Stage(String name, Status status, long startTime) { + this.name = name; + this.status = status; + this.startTime = startTime; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + + public void addMetric(Metric metric) { + this.metrics.put(metric.getName(), metric); + } + + public Metric getMetricByName(String name) { + return this.metrics.get(name); + } + + public List getMetrics() { + return new ArrayList<>(metrics.values()); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java new file mode 100644 index 0000000000..96cf565f76 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +/** + * Enum to define the status. + */ +public enum Status { + SUCCESS, + FAILED, + IN_PROGRESS +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java new file mode 100644 index 0000000000..f575e3d0b1 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.metric; + +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Progress; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.Arrays; + +/** + * Unit Test class for In Memory Replication Metric Collection. + */ +@RunWith(MockitoJUnitRunner.class) +public class TestReplicationMetricCollector { + + + + @Before + public void setup() throws Exception { + MetricCollector.getInstance().init(1); + } + + @Test + public void testFailureCacheHardLimit() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "repl", 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS); + + ReplicationMetricCollector incrDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "repl", 2, 1); + metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10); + incrDumpMetricCollector.reportStageStart("dump", metricMap); + try { + incrDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS); + Assert.fail(); + } catch (SemanticException e) { + Assert.assertEquals("Metrics are not getting collected. ", e.getMessage()); + } + } + + @Test + public void testFailureNoScheduledId() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "repl", 0, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS); + Assert.assertEquals(0, MetricCollector.getInstance().getMetrics().size()); + } + + @Test + public void testFailureNoPolicyId() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "", 0, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS); + Assert.assertEquals(0, MetricCollector.getInstance().getMetrics().size()); + } + + @Test + public void testSuccessBootstrapDumpMetrics() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "repl", 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + bootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + bootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + bootstrapDumpMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "staging"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 0, expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name())); + } + + @Test + public void testSuccessIncrDumpMetrics() throws Exception { + ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db", + "staging", "repl", 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + incrDumpMetricCollector.reportStageStart("dump", metricMap); + incrDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + incrDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + incrDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + incrDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + incrDumpMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 0, + expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name())); + } + + @Test + public void testSuccessBootstrapLoadMetrics() throws Exception { + ReplicationMetricCollector bootstrapLoadMetricCollector = new BootstrapLoadMetricCollector("db", + "staging", "repl", 1, 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapLoadMetricCollector.reportStageStart("dump", metricMap); + bootstrapLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + bootstrapLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + bootstrapLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + bootstrapLoadMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + bootstrapLoadMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "staging"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 1, + expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name())); + } + + @Test + public void testSuccessIncrLoadMetrics() throws Exception { + ReplicationMetricCollector incrLoadMetricCollector = new IncrementalLoadMetricCollector("db", + "staging", "repl", 1, 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + incrLoadMetricCollector.reportStageStart("dump", metricMap); + incrLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + incrLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + incrLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + incrLoadMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + incrLoadMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 1, + expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name())); + } + + private void checkSuccess(ReplicationMetric actual, ReplicationMetric expected, String stageName, + List metricNames) { + Assert.assertEquals(expected.getDumpExecutionId(), actual.getDumpExecutionId()); + Assert.assertEquals(expected.getPolicy(), actual.getPolicy()); + Assert.assertEquals(expected.getScheduledExecutionId(), actual.getScheduledExecutionId()); + Assert.assertEquals(expected.getMetadata().getReplicationType(), actual.getMetadata().getReplicationType()); + Assert.assertEquals(expected.getMetadata().getDbName(), actual.getMetadata().getDbName()); + Assert.assertEquals(expected.getMetadata().getStagingDir(), actual.getMetadata().getStagingDir()); + Assert.assertEquals(expected.getMetadata().getLastReplId(), actual.getMetadata().getLastReplId()); + Assert.assertEquals(expected.getProgress().getStatus(), actual.getProgress().getStatus()); + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getStatus(), + actual.getProgress().getStageByName(stageName).getStatus()); + for (String metricName : metricNames) { + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount(), + actual.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount()); + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName) + .getCurrentCount(), actual.getProgress() + .getStageByName(stageName).getMetricByName(metricName).getCurrentCount()); + } + } + +} diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index d1db106270..db1017beda 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -963,6 +963,17 @@ public static ConfVars getMetaConf(String name) { "hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/, "Maximum file size (in bytes) that Hive uses to do single HDFS copies between directories." + "Distributed copies (distcp) will be used instead for bigger files so that copies can be done faster."), + REPL_METRICS_CACHE_MAXSIZE("metastore.repl.metrics.cache.maxsize", + "hive.repl.metrics.cache.maxsize", 10000 /*10000 rows */, + "Maximum in memory cache size to collect replication metrics. The metrics will be pushed to persistent" + + " storage at a frequency defined by config hive.repl.metrics.update.frequency. Till metrics are persisted to" + + " db, it will be stored in this cache. So set this property based on number of concurrent policies running " + + " and the frequency of persisting the metrics to persistent storage. " + ), + REPL_METRICS_UPDATE_FREQUENCY("metastore.repl.metrics.update.frequency", + "hive.repl.metrics.update.frequency", 1 /*1 minute */, + "Frequency at which replication Metrics will be stored in persistent storage. " + ), SCHEMA_INFO_CLASS("metastore.schema.info.class", "hive.metastore.schema.info.class", "org.apache.hadoop.hive.metastore.MetaStoreSchemaInfo", "Fully qualified class name for the metastore schema information class \n"