diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 7b2c234600..bf8be65b50 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -77,4 +77,9 @@ /** A named lock is acquired prior to executing the query; enabling to run queries in parallel which might interfere with eachother. */ public static final String HIVE_QUERY_EXCLUSIVE_LOCK = "hive.query.exclusive.lock"; + + public static final String SCHEDULED_QUERY_NAMESPACE = "scheduled.query.namespace"; + public static final String SCHEDULED_QUERY_SCHEDULENAME = "scheduled.query.schedulename"; + public static final String SCHEDULED_QUERY_EXECUTIONID = "scheduled.query.executionid"; + public static final String SCHEDULED_QUERY_USER = "scheduled.query.user"; } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 641df005ed..e618cbdaac 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -430,7 +430,7 @@ private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIn confTemp.set("hive.repl.enable.move.optimization", "true"); Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceDb, replicadb, - null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId)); + null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), 0L); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext()); replLoadTask.executeTask(null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index 26cdc6bcfc..5f91d0426e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +47,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * Atlas Metadata Replication Dump Task. @@ -67,6 +64,9 @@ public int execute() { AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:", atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TAGS.name(), 0L); + work.getMetricCollector().reportStageStart(getName(), metricMap); atlasRestClient = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint()) .getClient(atlasReplInfo.getConf()); AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder(); @@ -75,10 +75,16 @@ public int execute() { long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid); dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo); createDumpMetadata(atlasReplInfo, currentModifiedTime); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS); return 0; } catch (Exception e) { LOG.error("Exception while dumping atlas metadata", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java index 3344152f43..3f10730be4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -34,13 +35,16 @@ private final Path stagingDir; private final boolean bootstrap; private final Path prevAtlasDumpDir; + private final transient ReplicationMetricCollector metricCollector; - public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir) { + public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, + ReplicationMetricCollector metricCollector) { this.srcDB = srcDB; this.stagingDir = stagingDir; this.bootstrap = bootstrap; this.prevAtlasDumpDir = prevAtlasDumpDir; + this.metricCollector = metricCollector; } public boolean isBootstrap() { @@ -58,4 +62,8 @@ public String getSrcDB() { public Path getStagingDir() { return stagingDir; } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index fceded5fdc..077cdde731 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,8 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; /** * Atlas Metadata Replication Load Task. @@ -54,14 +57,24 @@ public int execute() { try { AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TAGS.name(), 0L); + work.getMetricCollector().reportStageStart(getName(), metricMap); LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}", atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); int importCount = importAtlasMetadata(atlasReplInfo); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TAGS.name(), importCount); LOG.info("Atlas entities import count {}", importCount); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS); return 0; } catch (Exception e) { LOG.error("Exception while loading atlas metadata", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java index 4dc1ea81a6..817c214675 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -33,11 +34,13 @@ private final String srcDB; private final String tgtDB; private final Path stagingDir; + private final transient ReplicationMetricCollector metricCollector; - public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir) { + public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir, ReplicationMetricCollector metricCollector) { this.srcDB = srcDB; this.tgtDB = tgtDB; this.stagingDir = stagingDir; + this.metricCollector = metricCollector; } public static long getSerialVersionUID() { @@ -55,4 +58,8 @@ public String getTgtDB() { public Path getStagingDir() { return stagingDir; } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java index 5a56a6be95..92ca6ea6ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -33,13 +33,16 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.net.URL; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * RangerDumpTask. @@ -77,6 +80,11 @@ public int execute() { long exportCount = 0; Path filePath = null; LOG.info("Exporting Ranger Metadata"); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.POLICIES.name(), 0L); + work.getMetricCollector().reportStageStart(getName(), metricMap); + replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString()); + replLogger.startLog(); if (rangerRestClient == null) { rangerRestClient = getRangerRestClient(); } @@ -91,8 +99,6 @@ public int execute() { if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) { throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint); } - replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString()); - replLogger.startLog(); RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint, work.getDbName(), rangerHiveServiceName); List rangerPolicies = rangerExportPolicyList.getPolicies(); @@ -109,15 +115,22 @@ public int execute() { if (filePath != null) { LOG.info("Ranger policy export finished successfully"); exportCount = rangerExportPolicyList.getListSize(); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.POLICIES.name(), exportCount); } } replLogger.endLog(exportCount); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS); LOG.debug("Ranger policy export filePath:" + filePath); LOG.info("Number of ranger policies exported {}", exportCount); return 0; } catch (Exception e) { LOG.error("failed", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java index 026402b43e..b1393b20d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java @@ -19,6 +19,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import java.io.Serializable; @@ -36,10 +37,12 @@ private static final long serialVersionUID = 1L; private Path currentDumpPath; private String dbName; + private final transient ReplicationMetricCollector metricCollector; - public RangerDumpWork(Path currentDumpPath, String dbName) { + public RangerDumpWork(Path currentDumpPath, String dbName, ReplicationMetricCollector metricCollector) { this.currentDumpPath = currentDumpPath; this.dbName = dbName; + this.metricCollector = metricCollector; } public Path getCurrentDumpPath() { @@ -53,4 +56,8 @@ public String getDbName() { URL getRangerConfigResource() { return getClass().getClassLoader().getResource(ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME); } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java index 4e8a44fdae..417264cf3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -32,7 +32,9 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.load.log.RangerLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +42,9 @@ import java.io.Serializable; import java.net.URL; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_RANGER_ADD_DENY_POLICY_TARGET; /** @@ -101,6 +105,9 @@ public int execute() { replLogger = new RangerLoadLogger(work.getSourceDbName(), work.getTargetDbName(), work.getCurrentDumpPath().toString(), expectedPolicyCount); replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.POLICIES.name(), (long) expectedPolicyCount); + work.getMetricCollector().reportStageStart(getName(), metricMap); if (rangerExportPolicyList != null && !CollectionUtils.isEmpty(rangerExportPolicyList.getPolicies())) { rangerPolicies = rangerExportPolicyList.getPolicies(); } @@ -129,13 +136,20 @@ public int execute() { rangerHiveServiceName); LOG.info("Number of ranger policies imported {}", rangerExportPolicyList.getListSize()); importCount = rangerExportPolicyList.getListSize(); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.POLICIES.name(), importCount); replLogger.endLog(importCount); LOG.info("Ranger policy import finished {} ", importCount); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS); } return 0; } catch (Exception e) { LOG.error("Failed", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java index cddca6076a..f42575b85d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java @@ -19,6 +19,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,11 +41,14 @@ private Path currentDumpPath; private String targetDbName; private String sourceDbName; + private final transient ReplicationMetricCollector metricCollector; - public RangerLoadWork(Path currentDumpPath, String sourceDbName, String targetDbName) { + public RangerLoadWork(Path currentDumpPath, String sourceDbName, String targetDbName, + ReplicationMetricCollector metricCollector) { this.currentDumpPath = currentDumpPath; this.targetDbName = targetDbName; this.sourceDbName = sourceDbName; + this.metricCollector = metricCollector; } public Path getCurrentDumpPath() { @@ -62,4 +66,8 @@ public String getSourceDbName() { URL getRangerConfigResource() { return getClass().getClassLoader().getResource(ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME); } + + ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 046b6a00de..e4bafb728f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.ReplChangeManager; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; @@ -71,7 +73,11 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.thrift.TException; @@ -88,15 +94,7 @@ import java.io.UnsupportedEncodingException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.Set; -import java.util.HashSet; -import java.util.List; -import java.util.Arrays; -import java.util.Collections; -import java.util.Base64; -import java.util.LinkedList; -import java.util.UUID; -import java.util.ArrayList; +import java.util.*; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; @@ -147,6 +145,7 @@ public int execute() { Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); work.setCurrentDumpPath(currentDumpPath); + work.setMetricCollector(initMetricCollection(isBootstrap, hiveDumpRoot)); if (shouldDumpAtlasMetadata()) { addAtlasDumpTask(isBootstrap, previousValidHiveDumpPath); LOG.info("Added task to dump atlas metadata."); @@ -174,6 +173,11 @@ public int execute() { } catch (Exception e) { LOG.error("failed", e); setException(e); + try { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } return 0; @@ -183,7 +187,8 @@ private void initiateAuthorizationDumpTask() throws SemanticException { if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { Path rangerDumpRoot = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_RANGER_BASE_DIR); LOG.info("Exporting Authorization Metadata from {} at {} ", RANGER_AUTHORIZER, rangerDumpRoot); - RangerDumpWork rangerDumpWork = new RangerDumpWork(rangerDumpRoot, work.dbNameOrPattern); + RangerDumpWork rangerDumpWork = new RangerDumpWork(rangerDumpRoot, work.dbNameOrPattern, + work.getMetricCollector()); Task rangerDumpTask = TaskFactory.get(rangerDumpWork, conf); if (childTasks == null) { childTasks = new ArrayList<>(); @@ -240,7 +245,8 @@ private void addAtlasDumpTask(boolean bootstrap, Path prevHiveDumpDir) { Path atlasDumpDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_ATLAS_BASE_DIR); Path prevAtlasDumpDir = prevHiveDumpDir == null ? null : new Path(prevHiveDumpDir.getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); - AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir); + AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir, + work.getMetricCollector()); Task atlasDumpTask = TaskFactory.get(atlasDumpWork, conf); childTasks = new ArrayList<>(); childTasks.add(atlasDumpTask); @@ -253,6 +259,7 @@ private void finishRemainingTasks() throws SemanticException { + ReplAck.DUMP_ACKNOWLEDGEMENT.toString()); Utils.create(dumpAckFile, conf); prepareReturnValues(work.getResultValues()); + work.getMetricCollector().reportEnd(Status.SUCCESS); deleteAllPreviousDumpMeta(work.getCurrentDumpPath()); } @@ -499,10 +506,14 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; - replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), - evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, maxEventLimit), + long estimatedNumEvents = evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, + maxEventLimit); + replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), estimatedNumEvents, work.eventFrom, work.eventTo, maxEventLimit); replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.EVENTS.name(), estimatedNumEvents); + work.getMetricCollector().reportStageStart(getName(), metricMap); long dumpedCount = resumeFrom - work.eventFrom; if (dumpedCount > 0) { LOG.info("Event id {} to {} are already dumped, skipping {} events", work.eventFrom, resumeFrom, dumpedCount); @@ -522,7 +533,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive replLogger.endLog(lastReplId.toString()); LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); - dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot); + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executorId); // If repl policy is changed (oldReplScope is set), then pass the current replication policy, // so that REPL LOAD would drop the tables which are not included in current policy. @@ -587,9 +599,26 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive } work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId); return lastReplId; } + private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) { + ReplicationMetricCollector collector; + String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME); + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + long maxCacheSize = conf.getLong(MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getVarname(), + (long) MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getDefaultVal()); + if (isBootstrap) { + collector = new BootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), policy, executorId, + maxCacheSize); + } else { + collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), policy, executorId, + maxCacheSize); + } + return collector; + } + private int getMaxEventAllowed(int currentEventMaxLimit) { int maxDirItems = Integer.parseInt(conf.get(ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG, "0")); if (maxDirItems > 0) { @@ -603,7 +632,6 @@ private int getMaxEventAllowed(int currentEventMaxLimit) { } return currentEventMaxLimit; } - private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws IOException { Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1)); Retry retriable = new Retry(IOException.class) { @@ -674,6 +702,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cm ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1); replLogger.eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString()); } @@ -779,10 +808,16 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throw new HiveException("Replication dump not allowed for replicated database" + " with first incremental dump pending : " + dbName); } + int estimatedNumTables = Utils.getAllTables(hiveDb, dbName, work.replScope).size(); + int estimatedNumFunctions = hiveDb.getAllFunctions().size(); replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), - Utils.getAllTables(hiveDb, dbName, work.replScope).size(), - hiveDb.getAllFunctions().size()); + estimatedNumTables, + estimatedNumFunctions); replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) estimatedNumTables); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) estimatedNumFunctions); + work.getMetricCollector().reportStageStart(getName(), metricMap); Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb); Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName); dumpFunctionMetadata(dbName, dbRoot, hiveDb); @@ -841,11 +876,13 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) Long bootDumpEndReplId = currentNotificationId(hiveDb); LOG.info("Preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); - dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId); dmd.write(true); work.setDirCopyIterator(extTableCopyWorks.iterator()); work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator()); + work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId); return bootDumpBeginReplId; } @@ -912,7 +949,9 @@ Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hive MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); List managedTableCopyPaths = new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(false); + exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, + conf, mmCtx).write(false); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TABLES.name(), 1); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) || Utils.shouldDumpMetaDataOnly(conf)) { @@ -1042,6 +1081,7 @@ void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throw FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); serializer.writeTo(jsonWriter, tuple.replicationSpec); } + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.FUNCTIONS.name(), 1); replLogger.functionLog(functionName); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 86f92338a6..59cae6b9fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ private Path currentDumpPath; private List resultValues; private boolean shouldOverwrite; + private transient ReplicationMetricCollector metricCollector; public static void injectNextDumpDirForTest(String dumpDir) { injectNextDumpDirForTest(dumpDir, false); @@ -190,4 +192,12 @@ public void setResultValues(List resultValues) { public void setShouldOverwrite(boolean shouldOverwrite) { this.shouldOverwrite = shouldOverwrite; } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } + + public void setMetricCollector(ReplicationMetricCollector metricCollector) { + this.metricCollector = metricCollector; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 792e331884..c616190c8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -22,9 +22,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplScope; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc; @@ -59,7 +61,13 @@ import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.IOException; @@ -107,6 +115,7 @@ public int execute() { } work.setRootTask(this); this.parentTasks = null; + work.setMetricCollector(initMetricCollection(!work.isIncrementalLoad(), work.dumpDirectory)); if (shouldLoadAtlasMetadata()) { addAtlasLoadTask(); } @@ -120,14 +129,40 @@ public int execute() { } } catch (RuntimeException e) { LOG.error("replication failed with run time exception", e); + try { + work.getMetricCollector().reportEnd(Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } throw e; } catch (Exception e) { LOG.error("replication failed", e); setException(e); + try { + work.getMetricCollector().reportEnd(Status.FAILED); + } catch (SemanticException ex) { + LOG.error("Failed to collect Metrics ", ex); + } return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } } + private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, String dumpDirectory) { + ReplicationMetricCollector collector; + String policy = conf.get(Constants.SCHEDULED_QUERY_SCHEDULENAME); + long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); + long maxCacheSize = conf.getLong(MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getVarname(), + (long) MetastoreConf.ConfVars.REPL_METRICS_CACHE_MAXSIZE.getDefaultVal()); + if (isBootstrap) { + collector = new BootstrapLoadMetricCollector(work.dbNameToLoadIn, dumpDirectory, policy, executorId, + work.getDumpExecutionId(), maxCacheSize); + } else { + collector = new IncrementalLoadMetricCollector(work.dbNameToLoadIn, dumpDirectory, policy, executorId, + work.getDumpExecutionId(), maxCacheSize); + } + return collector; + } + private boolean shouldLoadAuthorizationMetadata() { return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA); } @@ -136,7 +171,8 @@ private void initiateAuthorizationLoadTask() throws SemanticException { if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { Path rangerLoadRoot = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_RANGER_BASE_DIR); LOG.info("Adding Import Ranger Metadata Task from {} ", rangerLoadRoot); - RangerLoadWork rangerLoadWork = new RangerLoadWork(rangerLoadRoot, work.getSourceDbName(), work.dbNameToLoadIn); + RangerLoadWork rangerLoadWork = new RangerLoadWork(rangerLoadRoot, work.getSourceDbName(), work.dbNameToLoadIn, + work.getMetricCollector()); Task rangerLoadTask = TaskFactory.get(rangerLoadWork, conf); if (childTasks == null) { childTasks = new ArrayList<>(); @@ -151,7 +187,8 @@ private void initiateAuthorizationLoadTask() throws SemanticException { private void addAtlasLoadTask() throws HiveException { Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir); - AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir); + AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir, + work.getMetricCollector()); Task atlasLoadTask = TaskFactory.get(atlasLoadWork, conf); if (childTasks == null) { childTasks = new ArrayList<>(); @@ -228,7 +265,7 @@ a database ( directory ) tableTracker.addTask(createViewTask(tableEvent.getMetaData(), work.dbNameToLoadIn, conf)); } else { LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext, - loadTaskTracker); + loadTaskTracker, work.getMetricCollector()); tableTracker = loadTable.tasks(work.isIncrementalLoad()); } @@ -254,7 +291,7 @@ a database ( directory ) // for a table we explicitly try to load partitions as there is no separate partitions events. LoadPartitions loadPartitions = new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent, - work.dbNameToLoadIn, tableContext); + work.dbNameToLoadIn, tableContext, work.getMetricCollector()); TaskTracker partitionsTracker = loadPartitions.tasks(); partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, partitionsTracker); @@ -321,7 +358,7 @@ private TaskTracker addLoadPartitionTasks(Context loadContext, BootstrapEvent ne TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); LoadPartitions loadPartitions = new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker, - event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); + event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector()); /* the tableTracker here should be a new instance and not an existing one as this can only happen when we break in between loading partitions. @@ -348,7 +385,7 @@ private TaskTracker addLoadConstraintsTasks(Context loadContext, private TaskTracker addLoadFunctionTasks(Context loadContext, BootstrapEventsIterator iterator, BootstrapEvent next, TaskTracker dbTracker, Scope scope) throws IOException, SemanticException { LoadFunction loadFunction = new LoadFunction(loadContext, iterator.replLogger(), - (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); + (FunctionEvent) next, work.dbNameToLoadIn, dbTracker, work.getMetricCollector()); TaskTracker functionsTracker = loadFunction.tasks(); if (!scope.database) { scope.rootTasks.addAll(functionsTracker.tasks()); @@ -442,7 +479,7 @@ private void createEndReplLogTask(Context context, Scope scope, Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); dbProps = dbInMetadata.getParameters(); } - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbProps, work.getMetricCollector()); Task replLogTask = TaskFactory.get(replLogWork, conf); if (scope.rootTasks.isEmpty()) { scope.rootTasks.add(replLogTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 26cd59b082..cba990b637 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; @@ -45,6 +47,8 @@ final String dumpDirectory; private boolean lastReplIDUpdated; private String sourceDbName; + private Long dumpExecutionId; + private transient ReplicationMetricCollector metricCollector; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -62,12 +66,15 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String sourceDbName, String dbNameToLoadIn, ReplScope currentReplScope, - LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException { + LineageState lineageState, boolean isIncrementalDump, Long eventTo, + Long dumpExecutionId) throws IOException, SemanticException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; this.currentReplScope = currentReplScope; this.sourceDbName = sourceDbName; + this.dumpExecutionId = dumpExecutionId; + // If DB name is changed during REPL LOAD, then set it instead of referring to source DB name. if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) { @@ -77,7 +84,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, rootTask = null; if (isIncrementalDump) { incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory, - new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo); + new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo, metricCollector); /* * If the current incremental dump also includes bootstrap for some tables, then create iterator @@ -87,7 +94,8 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, FileSystem fs = incBootstrapDir.getFileSystem(hiveConf); if (fs.exists(incBootstrapDir)) { this.bootstrapIterator = new BootstrapEventsIterator( - new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true, hiveConf); + new Path(incBootstrapDir, EximUtil.METADATA_PATH_NAME).toString(), dbNameToLoadIn, true, + hiveConf, metricCollector); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); } else { this.bootstrapIterator = null; @@ -95,7 +103,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, } } else { this.bootstrapIterator = new BootstrapEventsIterator(new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME) - .toString(), dbNameToLoadIn, true, hiveConf); + .toString(), dbNameToLoadIn, true, hiveConf, metricCollector); this.constraintsIterator = new ConstraintEventsIterator( new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME).toString(), hiveConf); incrementalLoadTasksBuilder = null; @@ -158,4 +166,16 @@ public void setLastReplIDUpdated(boolean lastReplIDUpdated) { public String getSourceDbName() { return sourceDbName; } + + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } + + public void setMetricCollector(ReplicationMetricCollector metricCollector) { + this.metricCollector = metricCollector; + } + + public Long getDumpExecutionId() { + return dumpExecutionId; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java index 7ade7c07d7..1edaaf6289 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.Serializable; @@ -34,7 +37,13 @@ @Override public int execute() { - work.replStateLog(); + try { + work.replStateLog(); + } catch (SemanticException e) { + LOG.error("Exception while logging metrics ", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } return 0; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java index 37725d68c6..5e2324c86e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java @@ -19,8 +19,12 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -43,6 +47,7 @@ private TableType tableType; private String functionName; private String lastReplId; + private final transient ReplicationMetricCollector metricCollector; private enum LOG_TYPE { TABLE, @@ -51,48 +56,59 @@ END } - public ReplStateLogWork(ReplLogger replLogger, String eventId, String eventType) { + public ReplStateLogWork(ReplLogger replLogger, ReplicationMetricCollector metricCollector, + String eventId, String eventType) { this.logType = LOG_TYPE.EVENT; this.replLogger = replLogger; this.eventId = eventId; this.eventType = eventType; + this.metricCollector = metricCollector; } - public ReplStateLogWork(ReplLogger replLogger, String tableName, TableType tableType) { + public ReplStateLogWork(ReplLogger replLogger, ReplicationMetricCollector metricCollector, + String tableName, TableType tableType) { this.logType = LOG_TYPE.TABLE; this.replLogger = replLogger; this.tableName = tableName; this.tableType = tableType; + this.metricCollector = metricCollector; } - public ReplStateLogWork(ReplLogger replLogger, String functionName) { + public ReplStateLogWork(ReplLogger replLogger, String functionName, ReplicationMetricCollector metricCollector) { this.logType = LOG_TYPE.FUNCTION; this.replLogger = replLogger; this.functionName = functionName; + this.metricCollector = metricCollector; } - public ReplStateLogWork(ReplLogger replLogger, Map dbProps) { + public ReplStateLogWork(ReplLogger replLogger, Map dbProps, + ReplicationMetricCollector metricCollector) { this.logType = LOG_TYPE.END; this.replLogger = replLogger; this.lastReplId = ReplicationSpec.getLastReplicatedStateFromParameters(dbProps); + this.metricCollector = metricCollector; } - public void replStateLog() { + public void replStateLog() throws SemanticException { switch (logType) { case TABLE: { replLogger.tableLog(tableName, tableType); + metricCollector.reportStageProgress("REPL_LOAD", ReplUtils.MetricName.TABLES.name(), 1); break; } case FUNCTION: { replLogger.functionLog(functionName); + metricCollector.reportStageProgress("REPL_LOAD", ReplUtils.MetricName.FUNCTIONS.name(), 1); break; } case EVENT: { replLogger.eventLog(eventId, eventType); + metricCollector.reportStageProgress("REPL_LOAD", ReplUtils.MetricName.EVENTS.name(), 1); break; } case END: { replLogger.endLog(lastReplId); + metricCollector.reportStageEnd("REPL_LOAD", Status.SUCCESS, Long.parseLong(lastReplId)); break; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 5bbe20c8c6..a6cb827f21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -23,13 +23,14 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; +import java.util.*; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -76,9 +77,12 @@ private final HiveConf hiveConf; private final boolean needLogger; private ReplLogger replLogger; + private final transient ReplicationMetricCollector metricCollector; - public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, boolean needLogger, HiveConf hiveConf) + public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, boolean needLogger, HiveConf hiveConf, + ReplicationMetricCollector metricCollector) throws IOException { + this.metricCollector = metricCollector; Path path = new Path(dumpDirectory); FileSystem fileSystem = path.getFileSystem(hiveConf); if (!fileSystem.exists(path)) { @@ -123,6 +127,7 @@ public boolean hasNext() { if (needLogger) { initReplLogger(); } + initMetricCollector(); } else { return false; } @@ -161,17 +166,16 @@ public ReplLogger replLogger() { return replLogger; } + public ReplicationMetricCollector getMetricCollector() { + return metricCollector; + } + private void initReplLogger() { try { Path dbDumpPath = currentDatabaseIterator.dbLevelPath(); FileSystem fs = dbDumpPath.getFileSystem(hiveConf); - - long numTables = getSubDirs(fs, dbDumpPath).length; - long numFunctions = 0; - Path funcPath = new Path(dbDumpPath, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); - if (fs.exists(funcPath)) { - numFunctions = getSubDirs(fs, funcPath).length; - } + long numTables = getNumTables(dbDumpPath, fs); + long numFunctions = getNumFunctions(dbDumpPath, fs); String dbName = StringUtils.isBlank(dbNameToLoadIn) ? dbDumpPath.getName() : dbNameToLoadIn; replLogger = new BootstrapLoadLogger(dbName, dumpDirectory, numTables, numFunctions); replLogger.startLog(); @@ -180,6 +184,36 @@ private void initReplLogger() { } } + private long getNumFunctions(Path dbDumpPath, FileSystem fs) throws IOException { + Path funcPath = new Path(dbDumpPath, ReplUtils.FUNCTIONS_ROOT_DIR_NAME); + if (fs.exists(funcPath)) { + return getSubDirs(fs, funcPath).length; + } + return 0; + } + + private long getNumTables(Path dbDumpPath, FileSystem fs) throws IOException { + return getSubDirs(fs, dbDumpPath).length; + } + + private void initMetricCollector() { + try { + Path dbDumpPath = currentDatabaseIterator.dbLevelPath(); + FileSystem fs = dbDumpPath.getFileSystem(hiveConf); + long numTables = getNumTables(dbDumpPath, fs); + long numFunctions = getNumFunctions(dbDumpPath, fs); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) numTables); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) numFunctions); + metricCollector.reportStageStart("REPL_LOAD", metricMap); + } + catch (IOException e) { + // Ignore the exception + } catch (SemanticException e) { + throw new RuntimeException("Failed to collect Metrics ", e); + } + } + FileStatus[] getSubDirs(FileSystem fs, Path dirPath) throws IOException { return fs.listStatus(dirPath, new PathFilter() { @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index 8815eeebe1..667ec7ff31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,19 +56,21 @@ private final FunctionEvent event; private final String dbNameToLoadIn; private final TaskTracker tracker; + private final ReplicationMetricCollector metricCollector; public LoadFunction(Context context, ReplLogger replLogger, FunctionEvent event, - String dbNameToLoadIn, TaskTracker existingTracker) { + String dbNameToLoadIn, TaskTracker existingTracker, ReplicationMetricCollector metricCollector) { this.context = context; this.replLogger = replLogger; this.event = event; this.dbNameToLoadIn = dbNameToLoadIn; this.tracker = new TaskTracker(existingTracker); + this.metricCollector = metricCollector; } private void createFunctionReplLogTask(List> functionTasks, String functionName) { - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, functionName); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, functionName, metricCollector); Task replLogTask = TaskFactory.get(replLogWork, context.hiveConf); DAGTraversal.traverse(functionTasks, new AddDependencyToLeaves(replLogTask)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index b36c4a531f..b78df44e84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; @@ -77,19 +78,22 @@ private final TableEvent event; private final TaskTracker tracker; private final AlterTableAddPartitionDesc lastReplicatedPartition; + private final ReplicationMetricCollector metricCollector; private final ImportTableDesc tableDesc; private Table table; public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker, TableEvent event, String dbNameToLoadIn, - TableContext tableContext) throws HiveException { - this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null); + TableContext tableContext, ReplicationMetricCollector metricCollector) throws HiveException { + this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, + metricCollector); } public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext, TaskTracker limiter, TableEvent event, String dbNameToLoadIn, - AlterTableAddPartitionDesc lastReplicatedPartition) throws HiveException { + AlterTableAddPartitionDesc lastReplicatedPartition, + ReplicationMetricCollector metricCollector) throws HiveException { this.tracker = new TaskTracker(limiter); this.event = event; this.context = context; @@ -99,6 +103,7 @@ public LoadPartitions(Context context, ReplLogger replLogger, TableContext table this.tableDesc = event.tableDesc(dbNameToLoadIn); this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); + this.metricCollector = metricCollector; } public TaskTracker tasks() throws Exception { @@ -118,7 +123,7 @@ public TaskTracker tasks() throws Exception { if (!forNewTable().hasReplicationState()) { // Add ReplStateLogTask only if no pending table load tasks left for next cycle Task replLogTask - = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf, metricCollector); tracker.addDependentTask(replLogTask); } return tracker; @@ -132,7 +137,7 @@ public TaskTracker tasks() throws Exception { if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) { // Add ReplStateLogTask only if no pending table load tasks left for next cycle Task replLogTask - = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf, metricCollector); tracker.addDependentTask(replLogTask); } return tracker; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 6cea22c01f..9e236fd697 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -73,14 +74,16 @@ private final TableContext tableContext; private final TaskTracker tracker; private final TableEvent event; + private final ReplicationMetricCollector metricCollector; public LoadTable(TableEvent event, Context context, ReplLogger replLogger, - TableContext tableContext, TaskTracker limiter) { + TableContext tableContext, TaskTracker limiter, ReplicationMetricCollector metricCollector) { this.event = event; this.context = context; this.replLogger = replLogger; this.tableContext = tableContext; this.tracker = new TaskTracker(limiter); + this.metricCollector = metricCollector; } public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception { @@ -151,7 +154,7 @@ public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception { ); if (!isPartitioned(tableDesc)) { Task replLogTask - = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf); + = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf, metricCollector); ckptTask.addDependentTask(replLogTask); } tracker.addDependentTask(ckptTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 7e844d3164..962e27ce60 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.slf4j.Logger; @@ -73,9 +75,11 @@ private final ReplLogger replLogger; private static long numIteration; private final Long eventTo; + private final ReplicationMetricCollector metricCollector; public IncrementalLoadTasksBuilder(String dbName, String loadPath, - IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo) { + IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo, + ReplicationMetricCollector metricCollector) throws SemanticException { this.dbName = dbName; this.iterator = iterator; inputs = new HashSet<>(); @@ -84,8 +88,12 @@ public IncrementalLoadTasksBuilder(String dbName, String loadPath, this.conf = conf; replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); replLogger.startLog(); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) iterator.getNumEvents()); + metricCollector.reportStageStart("REPL_LOAD", metricMap); this.eventTo = eventTo; numIteration = 0; + this.metricCollector = metricCollector; } public Task build(Context context, Hive hive, Logger log, @@ -135,7 +143,7 @@ public IncrementalLoadTasksBuilder(String dbName, String loadPath, List> evTasks = analyzeEventLoad(mhContext); if ((evTasks != null) && (!evTasks.isEmpty())) { - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, metricCollector, dir.getPath().getName(), eventDmd.getDumpType().toString()); Task barrierTask = TaskFactory.get(replStateLogWork, conf); @@ -157,7 +165,7 @@ public IncrementalLoadTasksBuilder(String dbName, String loadPath, Map dbProps = new HashMap<>(); dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); - ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps, metricCollector); Task barrierTask = TaskFactory.get(replStateLogWork, conf); taskChainTail.addDependentTask(barrierTask); this.log.debug("Added {}:{} as a precursor of barrier task {}:{}", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index c0aadb5aa2..c6f35c8c9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -134,6 +135,10 @@ LOAD_NEW, LOAD_SKIP, LOAD_REPLACE } + public enum MetricName { + TABLES, FUNCTIONS, EVENTS, POLICIES, TAGS + } + public static Map> genPartSpecs( Table table, List> partitions) throws SemanticException { Map> partSpecs = new HashMap<>(); @@ -167,10 +172,12 @@ return partSpecs; } - public static Task getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf) + public static Task getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf, + ReplicationMetricCollector metricCollector) throws SemanticException { TableType tableType = tableDesc.isExternal() ? TableType.EXTERNAL_TABLE : tableDesc.tableType(); - ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableType); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, metricCollector, + tableDesc.getTableName(), tableType); return TaskFactory.get(replLogWork, conf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 7959df2b2f..76276bc6fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -398,7 +398,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), sourceDbNameOrPattern, replScope.getDbName(), dmd.getReplScope(), - queryState.getLineageState(), evDump, dmd.getEventTo()); + queryState.getLineageState(), evDump, dmd.getEventTo(), dmd.getDumpExecutionId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } else { LOG.warn("Previous Dump Already Loaded"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java new file mode 100644 index 0000000000..a5463ae875 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/BootstrapDumpMetricCollector.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.metric; + +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * BootstrapDumpMetricCollector. + * Bootstrap Dump Metric Collector + */ +public class BootstrapDumpMetricCollector extends ReplicationMetricCollector { + public BootstrapDumpMetricCollector(String dbName, String stagingDir, String policy, + long executionId, long maxCacheSize) { + super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, policy, executionId, 0, maxCacheSize); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java new file mode 100644 index 0000000000..9038b299ac --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/IncrementalDumpMetricCollector.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.metric; + +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * IncrementalDumpMetricCollector. + * Incremental Dump Metric Collector + */ +public class IncrementalDumpMetricCollector extends ReplicationMetricCollector { + public IncrementalDumpMetricCollector(String dbName, String stagingDir, String policy, + long executionId, long maxCacheSize) { + super(dbName, Metadata.ReplicationType.INCREMENTAL, stagingDir, policy, executionId, 0, + maxCacheSize); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index e538c79f34..dc40e1df9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -51,6 +51,7 @@ private boolean initialized = false; private final Path dumpFile; private final HiveConf hiveConf; + private Long dumpExecutionId; public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { this.hiveConf = hiveConf; @@ -60,15 +61,16 @@ public DumpMetaData(Path dumpRoot, HiveConf hiveConf) { public DumpMetaData(Path dumpRoot, DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, HiveConf hiveConf) { this(dumpRoot, hiveConf); - setDump(lvl, eventFrom, eventTo, cmRoot); + setDump(lvl, eventFrom, eventTo, cmRoot, 0L); } - public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot) { + public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, Long dumpExecutionId) { this.dumpType = lvl; this.eventFrom = eventFrom; this.eventTo = eventTo; this.cmRoot = cmRoot; this.initialized = true; + this.dumpExecutionId = dumpExecutionId; } public void setPayload(String payload) { @@ -115,11 +117,11 @@ private void loadDumpFromFile() throws SemanticException { br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line; if ((line = br.readLine()) != null) { - String[] lineContents = line.split("\t", 5); + String[] lineContents = line.split("\t", 6); setDump(DumpType.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]), - new Path(lineContents[3])); - setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]); + new Path(lineContents[3]), Long.valueOf(lineContents[4])); + setPayload(lineContents[5].equals(Utilities.nullStringOutput) ? null : lineContents[5]); } else { throw new IOException( "Unable to read valid values from dumpFile:" + dumpFile.toUri().toString()); @@ -158,6 +160,11 @@ public Long getEventTo() throws SemanticException { return eventTo; } + public Long getDumpExecutionId() throws SemanticException { + initializeIfNot(); + return dumpExecutionId; + } + public ReplScope getReplScope() throws SemanticException { initializeIfNot(); return replScope; @@ -207,6 +214,7 @@ public void write(boolean replace) throws SemanticException { eventFrom.toString(), eventTo.toString(), cmRoot.toString(), + dumpExecutionId.toString(), payload) ); if (replScope != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/BootstrapLoadMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/BootstrapLoadMetricCollector.java new file mode 100644 index 0000000000..df60cff69e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/BootstrapLoadMetricCollector.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.metric; + +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * BootstrapLoadMetricCollector. + * Bootstrap Load Metric Collector + */ +public class BootstrapLoadMetricCollector extends ReplicationMetricCollector { + public BootstrapLoadMetricCollector(String dbName, String stagingDir, String policy, + long executionId, long dumpExecutionId, long maxCacheSize) { + super(dbName, Metadata.ReplicationType.BOOTSTRAP, stagingDir, policy, executionId, dumpExecutionId, maxCacheSize); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/IncrementalLoadMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/IncrementalLoadMetricCollector.java new file mode 100644 index 0000000000..839b3b34ed --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/IncrementalLoadMetricCollector.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.metric; + +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * IncrementalLoadMetricCollector. + * Incremental Load Metric Collector + */ +public class IncrementalLoadMetricCollector extends ReplicationMetricCollector { + public IncrementalLoadMetricCollector(String dbName, String stagingDir, String policy, + long executionId, long dumpExecutionId, long maxCacheSize) { + super(dbName, Metadata.ReplicationType.INCREMENTAL, stagingDir, policy, executionId, dumpExecutionId, maxCacheSize); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricCollector.java new file mode 100644 index 0000000000..a4f5d9ecb2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricCollector.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.metric; + +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * MetricCollector. + * In memory collection of metrics + */ +public class MetricCollector { + private Logger LOG = LoggerFactory.getLogger(MetricCollector.class); + private Map metricMap = new ConcurrentHashMap<>(); + private final long maxSize; + + private static volatile MetricCollector instance; + + private MetricCollector(long maxSize){ + this.maxSize = maxSize; + } + + public static MetricCollector getInstance(long maxSize) { + if (instance == null) { + synchronized (MetricCollector.class) { + if (instance == null) { + instance = new MetricCollector(maxSize); + } + } + } + return instance; + } + + public void addMetric(ReplicationMetric replicationMetric) throws SemanticException { + if (metricMap.size() > maxSize) { + throw new SemanticException("Metrics are not getting collected. "); + } else { + if (metricMap.size() > 0.8 * maxSize) { //soft limit + LOG.error("Metrics cache is 80 % full. Will start dropping metrics once full. "); + } + metricMap.put(replicationMetric.getScheduledExecutionId(), replicationMetric); + } + } + + public List getMetrics() { + List metricList = new ArrayList<>(metricMap.values()); + metricMap.clear(); + return metricList; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java new file mode 100644 index 0000000000..68a27d5832 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric; + +/** + * MetricSink. + * Scheduled thread to poll from Metric Collector and persists to DB + */ +public class MetricSink { +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java new file mode 100644 index 0000000000..b03d867677 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric; + +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.*; + +import java.util.Map; + +/** + * Abstract class for Replication Metric Collection. + */ +public abstract class ReplicationMetricCollector { + private ReplicationMetric replicationMetric; + private MetricCollector metricCollector; + private boolean isEnabled; + + public ReplicationMetricCollector(String dbName, Metadata.ReplicationType replicationType, + String stagingDir, String policy, long executionId, + long dumpExecutionId, long maxCacheSize) { + if (!StringUtils.isEmpty(policy) && executionId > 0) { + isEnabled = true; + metricCollector = MetricCollector.getInstance(maxCacheSize); + Metadata metadata = new Metadata(dbName, replicationType, stagingDir); + replicationMetric = new ReplicationMetric(executionId, policy, dumpExecutionId, metadata); + } + } + + public void reportStageStart(String stageName, Map metricMap) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + Stage stage = new Stage(stageName, Status.IN_PROGRESS, System.currentTimeMillis()); + for (String metricName : metricMap.keySet()) { + stage.addMetric(new Metric(metricName, metricMap.get(metricName))); + } + progress.addStage(stage); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } + + + public void reportStageEnd(String stageName, Status status, long lastReplId) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + stage.setStatus(status); + stage.setEndTime(System.currentTimeMillis()); + progress.addStage(stage); + replicationMetric.setProgress(progress); + Metadata metadata = replicationMetric.getMetadata(); + metadata.setLastReplId(lastReplId); + replicationMetric.setMetadata(metadata); + metricCollector.addMetric(replicationMetric); + } + } + + public void reportStageEnd(String stageName, Status status) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + stage.setStatus(status); + stage.setEndTime(System.currentTimeMillis()); + progress.addStage(stage); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } + + public void reportStageProgress(String stageName, String metricName, long count) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + Metric metric = stage.getMetricByName(metricName); + metric.setCurrentCount(metric.getCurrentCount() + count); + if (metric.getCurrentCount() > metric.getTotalCount()) { + metric.setTotalCount(metric.getCurrentCount()); + } + stage.addMetric(metric); + progress.addStage(stage); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } + + public void reportEnd(Status status) throws SemanticException { + if (isEnabled) { + Progress progress = replicationMetric.getProgress(); + progress.setStatus(status); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java new file mode 100644 index 0000000000..8292560145 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +/** + * Class for defining the metadata info for replication metrics. + */ +public class Metadata { + public enum ReplicationType { + BOOTSTRAP, + INCREMENTAL + } + private String dbName; + private ReplicationType replicationType; + private String stagingDir; + private long lastReplId; + + public Metadata(String dbName, ReplicationType replicationType, String stagingDir) { + this.dbName = dbName; + this.replicationType = replicationType; + this.stagingDir = stagingDir; + } + + public long getLastReplId() { + return lastReplId; + } + + public String getDbName() { + return dbName; + } + + public ReplicationType getReplicationType() { + return replicationType; + } + + public String getStagingDir() { + return stagingDir; + } + + public void setLastReplId(long lastReplId) { + this.lastReplId = lastReplId; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metric.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metric.java new file mode 100644 index 0000000000..fdd6fa403f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metric.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +/** + * Class for defining the unit metric. + */ +public class Metric { + private String name; + private long currentCount; + private long totalCount; + + public Metric(String name, long totalCount) { + this.name = name; + this.totalCount = totalCount; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public long getCurrentCount() { + return currentCount; + } + + public void setCurrentCount(long currentCount) { + this.currentCount = currentCount; + } + + public long getTotalCount() { + return totalCount; + } + + public void setTotalCount(long totalCount) { + this.totalCount = totalCount; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Progress.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Progress.java new file mode 100644 index 0000000000..b4e0f48102 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Progress.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Class for defining the progress info for replication metrics. + */ +public class Progress { + + private Status status; + + private Map stages = new ConcurrentHashMap<>(); + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public void addStage(Stage stage) { + stages.put(stage.getName(), stage); + } + + public Stage getStageByName(String stageName) { + return stages.get(stageName); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java new file mode 100644 index 0000000000..cc833613be --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/ReplicationMetric.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +/** + * Class for defining the replication metrics. + */ +public class ReplicationMetric { + private long scheduledExecutionId; + private String policy; + private long dumpExecutionId; + private Metadata metadata; + private Progress progress; + + public ReplicationMetric(long scheduledExecutionId, String policy, long dumpExecutionId, Metadata metadata){ + this.scheduledExecutionId = scheduledExecutionId; + this.policy = policy; + this.dumpExecutionId = dumpExecutionId; + this.metadata = metadata; + this.progress = new Progress(); + } + + public long getScheduledExecutionId() { + return scheduledExecutionId; + } + + + public String getPolicy() { + return policy; + } + + public long getDumpExecutionId() { + return dumpExecutionId; + } + + public Progress getProgress() { + return progress; + } + + public void setMetadata(Metadata metadata) { + this.metadata = metadata; + } + + public Metadata getMetadata() { + return metadata; + } + + public void setProgress(Progress progress) { + this.progress = progress; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java new file mode 100644 index 0000000000..4a243034ef --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +import java.util.HashMap; +import java.util.Map; + +/** + * Class for defining the different stages of replication. + */ +public class Stage { + private String name; + private Status status; + private long startTime; + private long endTime; + private Map metrics = new HashMap<>(); + + public Stage(String name, Status status, long startTime) { + this.name = name; + this.status = status; + this.startTime = startTime; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + + public void addMetric(Metric metric) { + this.metrics.put(metric.getName(), metric); + } + + public Metric getMetricByName(String name) { + return this.metrics.get(name); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java new file mode 100644 index 0000000000..96cf565f76 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.metric.event; + +/** + * Enum to define the status. + */ +public enum Status { + SUCCESS, + FAILED, + IN_PROGRESS +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java index ca120933dc..3cbaa60bdf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java @@ -223,6 +223,10 @@ private void processQuery(ScheduledQueryPollResponse q) { HiveConf conf = new HiveConf(context.conf); conf.set(Constants.HIVE_QUERY_EXCLUSIVE_LOCK, lockNameFor(q.getScheduleKey())); conf.setVar(HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, SessionStateUserAuthenticator.class.getName()); + conf.set(Constants.SCHEDULED_QUERY_NAMESPACE, q.getScheduleKey().getClusterNamespace()); + conf.set(Constants.SCHEDULED_QUERY_SCHEDULENAME, q.getScheduleKey().getScheduleName()); + conf.set(Constants.SCHEDULED_QUERY_USER, q.getUser()); + conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, Long.toString(q.getExecutionId())); conf.unset(HiveConf.ConfVars.HIVESESSIONID.varname); state = new SessionState(conf, q.getUser()); state.setIsHiveServerQuery(true); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java new file mode 100644 index 0000000000..e8d2c85d61 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.metric; + +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.*; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.*; + + +/** + * Unit Test class for In Memory Replication Metric Collection. + */ +@RunWith(MockitoJUnitRunner.class) +public class TestReplicationMetricCollector { + + + + @Before + public void setup() throws Exception { + + } + + @Test + public void testFailureCacheHardLimit() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "repl", 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS); + + ReplicationMetricCollector incrDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "repl", 2, 1); + metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.EVENTS.name(), (long) 10); + incrDumpMetricCollector.reportStageStart("dump", metricMap); + try { + incrDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS); + Assert.fail(); + } catch (SemanticException e) { + Assert.assertEquals("Metrics are not getting collected. ", e.getMessage()); + } + } + + @Test + public void testFailureNoScheduledId() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "repl", 0, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS); + Assert.assertEquals(0, MetricCollector.getInstance(1).getMetrics().size()); + } + + @Test + public void testFailureNoPolicyId() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "", 0, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS); + Assert.assertEquals(0, MetricCollector.getInstance(1).getMetrics().size()); + } + + @Test + public void testSuccessBootstrapDumpMetrics() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", "repl", 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + bootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + bootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + bootstrapDumpMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "staging"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 0, expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", Arrays.asList(ReplUtils.MetricName.TABLES.name(), + ReplUtils.MetricName.FUNCTIONS.name())); + } + + @Test + public void testSuccessIncrDumpMetrics() throws Exception { + ReplicationMetricCollector incrDumpMetricCollector = new IncrementalDumpMetricCollector("db", + "staging", "repl", 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + incrDumpMetricCollector.reportStageStart("dump", metricMap); + incrDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + incrDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + incrDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + incrDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + incrDumpMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 0, + expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), + ReplUtils.MetricName.FUNCTIONS.name())); + } + + @Test + public void testSuccessBootstrapLoadMetrics() throws Exception { + ReplicationMetricCollector bootstrapLoadMetricCollector = new BootstrapLoadMetricCollector("db", + "staging", "repl", 1, 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapLoadMetricCollector.reportStageStart("dump", metricMap); + bootstrapLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + bootstrapLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + bootstrapLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + bootstrapLoadMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + bootstrapLoadMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.BOOTSTRAP, "staging"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 1, + expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), + ReplUtils.MetricName.FUNCTIONS.name())); + } + + @Test + public void testSuccessIncrLoadMetrics() throws Exception { + ReplicationMetricCollector incrLoadMetricCollector = new IncrementalLoadMetricCollector("db", + "staging", "repl", 1, 1, 1); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + incrLoadMetricCollector.reportStageStart("dump", metricMap); + incrLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + incrLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + incrLoadMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + incrLoadMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10); + incrLoadMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance(1).getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.INCREMENTAL, "staging"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 1, + expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), + ReplUtils.MetricName.FUNCTIONS.name())); + } + + private void checkSuccess(ReplicationMetric actual, ReplicationMetric expected, String stageName, + List metricNames) { + Assert.assertEquals(expected.getDumpExecutionId(), actual.getDumpExecutionId()); + Assert.assertEquals(expected.getPolicy(), actual.getPolicy()); + Assert.assertEquals(expected.getScheduledExecutionId(), actual.getScheduledExecutionId()); + Assert.assertEquals(expected.getMetadata().getReplicationType(), actual.getMetadata().getReplicationType()); + Assert.assertEquals(expected.getMetadata().getDbName(), actual.getMetadata().getDbName()); + Assert.assertEquals(expected.getMetadata().getStagingDir(), actual.getMetadata().getStagingDir()); + Assert.assertEquals(expected.getMetadata().getLastReplId(), actual.getMetadata().getLastReplId()); + Assert.assertEquals(expected.getProgress().getStatus(), actual.getProgress().getStatus()); + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getStatus(), + actual.getProgress().getStageByName(stageName).getStatus()); + for (String metricName : metricNames) { + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount(), + actual.getProgress().getStageByName(stageName).getMetricByName(metricName).getTotalCount()); + Assert.assertEquals(expected.getProgress().getStageByName(stageName).getMetricByName(metricName).getCurrentCount(), + actual.getProgress().getStageByName(stageName).getMetricByName(metricName).getCurrentCount()); + } + } + +} \ No newline at end of file diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index d1db106270..db1017beda 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -963,6 +963,17 @@ public static ConfVars getMetaConf(String name) { "hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/, "Maximum file size (in bytes) that Hive uses to do single HDFS copies between directories." + "Distributed copies (distcp) will be used instead for bigger files so that copies can be done faster."), + REPL_METRICS_CACHE_MAXSIZE("metastore.repl.metrics.cache.maxsize", + "hive.repl.metrics.cache.maxsize", 10000 /*10000 rows */, + "Maximum in memory cache size to collect replication metrics. The metrics will be pushed to persistent" + + " storage at a frequency defined by config hive.repl.metrics.update.frequency. Till metrics are persisted to" + + " db, it will be stored in this cache. So set this property based on number of concurrent policies running " + + " and the frequency of persisting the metrics to persistent storage. " + ), + REPL_METRICS_UPDATE_FREQUENCY("metastore.repl.metrics.update.frequency", + "hive.repl.metrics.update.frequency", 1 /*1 minute */, + "Frequency at which replication Metrics will be stored in persistent storage. " + ), SCHEMA_INFO_CLASS("metastore.schema.info.class", "hive.metastore.schema.info.class", "org.apache.hadoop.hive.metastore.MetaStoreSchemaInfo", "Fully qualified class name for the metastore schema information class \n"