Subject: [PATCH] HIVE-26606: Expose failover states in replication metrics --- Index: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -397,7 +397,7 @@ assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()))); dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()))); - assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL); + assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP); assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); db = replica.getDatabase(replicatedDbName); assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET)); @@ -625,7 +625,7 @@ dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()); assertTrue(fs.exists(dumpAckFile)); assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()))); - assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL); + assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP); db = replica.getDatabase(replicatedDbName); assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET)); assertTrue(MetaStoreUtils.isTargetOfReplication(db)); @@ -734,7 +734,7 @@ Path dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()); assertTrue(fs.exists(dumpAckFile)); assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()))); - assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL); + assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP); db = replica.getDatabase(replicatedDbName); assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET)); assertTrue(MetaStoreUtils.isTargetOfReplication(db)); @@ -748,7 +748,7 @@ assertTrue(fs.exists(new Path(preFailoverDumpData.dumpLocation))); assertNotEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation); assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()))); - assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL); + assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP); assertTrue(fs.exists(dumpAckFile)); db = replica.getDatabase(replicatedDbName); assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET)); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -260,7 +260,7 @@ LOG.info("Created event_ack file at {} with source eventId {} and target eventId {}", filePath, dbEventId, targetDbEventId); work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); - dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L, false); + dmd.setDump(DumpType.PRE_OPTIMIZED_BOOTSTRAP, work.eventFrom, lastReplId, cmRoot, -1L, false); dmd.write(true); return lastReplId; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -88,6 +88,8 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.OptimizedBootstrapDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.PreOptimizedBootstrapDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; @@ -199,7 +201,7 @@ if (ReplUtils.failedWithNonRecoverableError(latestDumpPath, conf)) { LOG.error("Previous dump failed with non recoverable error. Needs manual intervention. "); Path nonRecoverableFile = new Path(latestDumpPath, NON_RECOVERABLE_MARKER.toString()); - ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, nonRecoverableFile.toString(), conf); + ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, nonRecoverableFile.toString(), conf, false); setException(new SemanticException(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.format())); return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(); } @@ -226,7 +228,7 @@ String mapRedCustomName = ReplUtils.getDistCpCustomName(conf, work.dbNameOrPattern); conf.set(JobContext.JOB_NAME, mapRedCustomName); work.setCurrentDumpPath(currentDumpPath); - work.setMetricCollector(initMetricCollection(work.isBootstrap(), hiveDumpRoot)); + work.setMetricCollector(initMetricCollection(work.isBootstrap(), hiveDumpRoot, isFailover)); if (shouldDumpAtlasMetadata()) { addAtlasDumpTask(work.isBootstrap(), previousValidHiveDumpPath); LOG.info("Added task to dump atlas metadata."); @@ -254,6 +256,7 @@ LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId); lastReplId = createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, targetDbEventId, conf, work); + ReplUtils.reportStatusInReplicationMetrics("PRE_OPTIMIZED_BOOTSTRAP", Status.SUCCESS, null, conf, isFailover); finishRemainingTasks(); } else { // We should be here only if TableDiff is Present. @@ -265,7 +268,7 @@ assert isTableDiffDirectoryPresent; work.setSecondDumpAfterFailover(true); - + work.setMetricCollector(new OptimizedBootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf)); long fromEventId = Long.parseLong(getEventIdFromFile(previousValidHiveDumpPath.getParent(), conf)[1]); LOG.info("Starting optimised bootstrap from event id {} for database {}", fromEventId, work.dbNameOrPattern); @@ -303,7 +306,7 @@ } else { LOG.info("Previous Dump is not yet loaded. Skipping this iteration."); } - ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, null, conf); + ReplUtils.reportStatusInReplicationMetrics(getName(), Status.SKIPPED, null, conf, false); } } } catch (RuntimeException e) { @@ -906,8 +909,14 @@ } finally { //write the dmd always irrespective of success/failure to enable checkpointing in table level replication long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L); - dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId, - previousReplScopeModified()); + if (work.isSecondDumpAfterFailover()){ + dmd.setDump(DumpType.OPTIMIZED_BOOTSTRAP, work.eventFrom, lastReplId, cmRoot, executionId, + previousReplScopeModified()); + } + else { + dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, executionId, + previousReplScopeModified()); + } // If repl policy is changed (oldReplScope is set), then pass the current replication policy, // so that REPL LOAD would drop the tables which are not included in current policy. dmd.setReplScope(work.replScope); @@ -1045,12 +1054,16 @@ } } - private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) { + private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot, boolean isFailover) { ReplicationMetricCollector collector; if (isBootstrap) { collector = new BootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf); } else { - collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf); + if (isFailover) { + collector = new PreOptimizedBootstrapDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf); + } else { + collector = new IncrementalDumpMetricCollector(work.dbNameOrPattern, dumpRoot.toString(), conf); + } } return collector; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -30,6 +30,8 @@ import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.thrift.TException; import com.google.common.collect.Collections2; import org.apache.commons.lang3.StringUtils; @@ -799,7 +801,10 @@ if (this.childTasks == null) { this.childTasks = new ArrayList<>(); } + work.getMetricCollector().reportStageStart("PRE_OPTIMIZED_BOOTSTRAP", new HashMap<>()); createReplLoadCompleteAckTask(); + work.getMetricCollector().reportStageEnd("PRE_OPTIMIZED_BOOTSTRAP", Status.SUCCESS); + work.getMetricCollector().reportEnd(Status.SUCCESS); return 0; } else if (work.isSecondFailover) { // DROP the tables extra on target, which are not on source cluster. Index: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; @@ -533,14 +534,19 @@ } public static void reportStatusInReplicationMetrics(String stageName, Status status, String errorLogPath, - HiveConf conf) + HiveConf conf, boolean isFailover) throws SemanticException { - ReplicationMetricCollector metricCollector = - new ReplicationMetricCollector(null, null, null, 0, conf) {}; + ReplicationMetricCollector metricCollector; + if (isFailover) { + metricCollector = + new PreOptimizedBootstrapLoadMetricCollector(null, null, -1, conf) {}; + } else { + metricCollector = + new ReplicationMetricCollector(null, null, null, 0, conf) {}; + } metricCollector.reportStageStart(stageName, new HashMap<>()); metricCollector.reportStageEnd(stageName, status, errorLogPath); } - public static boolean isErrorRecoverable(Throwable e) { int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); return errorCode > ErrorMsg.GENERIC_ERROR.getErrorCode(); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -43,6 +43,8 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.OptimizedBootstrapLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -98,7 +100,7 @@ analyzeReplDump(ast); } catch (SemanticException e) { ReplUtils.reportStatusInReplicationMetrics("REPL_DUMP", ReplUtils.isErrorRecoverable(e) - ? Status.FAILED_ADMIN : Status.FAILED, null, conf); + ? Status.FAILED_ADMIN : Status.FAILED, null, conf, false); throw e; } break; @@ -110,7 +112,7 @@ } catch (SemanticException e) { if (!e.getMessage().equals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg())) { ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", ReplUtils.isErrorRecoverable(e) - ? Status.FAILED_ADMIN : Status.FAILED, null, conf); + ? Status.FAILED_ADMIN : Status.FAILED, null, conf, false); } throw e; } @@ -318,7 +320,7 @@ // looking at each db, and then each table, and then setting up the appropriate // import job in its place. try { - assert(sourceDbNameOrPattern != null); + assert (sourceDbNameOrPattern != null); Path loadPath = getCurrentLoadPath(); // Now, the dumped path can be one of three things: @@ -342,7 +344,7 @@ if (ReplUtils.failedWithNonRecoverableError(latestDumpPath, conf)) { Path nonRecoverableFile = new Path(latestDumpPath, ReplAck.NON_RECOVERABLE_MARKER.toString()); ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, - nonRecoverableFile.toString(), conf); + nonRecoverableFile.toString(), conf, false); throw new Exception(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg()); } if (loadPath != null) { @@ -350,21 +352,24 @@ boolean evDump = false; // we will decide what hdfs locations needs to be copied over here as well. - if (dmd.isIncrementalDump()) { - LOG.debug("{} contains an incremental dump", loadPath); + if (dmd.isIncrementalDump() || dmd.isOptimizedBootstrapDump() || dmd.isPreOptimizedBootstrapDump()) { + LOG.debug("{} contains an incremental / Optimized bootstrap dump", loadPath); evDump = true; } else { LOG.debug("{} contains an bootstrap dump", loadPath); } + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), sourceDbNameOrPattern, replScope.getDbName(), dmd.getReplScope(), queryState.getLineageState(), evDump, dmd.getEventTo(), dmd.getDumpExecutionId(), - initMetricCollection(!evDump, loadPath.toString(), replScope.getDbName(), - dmd.getDumpExecutionId()), dmd.isReplScopeModified()); + initMetricCollection(loadPath.toString(), replScope.getDbName(), dmd), dmd.isReplScopeModified()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); + if (dmd.isPreOptimizedBootstrapDump()) { + dmd.setOptimizedBootstrapToDumpMetadataFile(); + } } else { - ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, null, conf); + ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", Status.SKIPPED, null, conf, false); LOG.warn("Previous Dump Already Loaded"); } } catch (Exception e) { @@ -373,13 +378,18 @@ } } - private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, String dumpDirectory, - String dbNameToLoadIn, long dumpExecutionId) { + private ReplicationMetricCollector initMetricCollection(String dumpDirectory, + String dbNameToLoadIn, DumpMetaData dmd) throws SemanticException { + ReplicationMetricCollector collector; - if (isBootstrap) { - collector = new BootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dumpExecutionId, conf); + if (dmd.isPreOptimizedBootstrapDump()) { + collector = new PreOptimizedBootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf); + } else if (dmd.isOptimizedBootstrapDump()) { + collector = new OptimizedBootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf); + } else if (dmd.isBootstrapDump()) { + collector = new BootstrapLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf); } else { - collector = new IncrementalLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dumpExecutionId, conf); + collector = new IncrementalLoadMetricCollector(dbNameToLoadIn, dumpDirectory, dmd.getDumpExecutionId(), conf); } return collector; } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -200,6 +200,18 @@ return new DefaultHandler(); } }, + PRE_OPTIMIZED_BOOTSTRAP("PRE_OPTIMIZED_BOOTSTRAP") { + @Override + public MessageHandler handler() { + return new DefaultHandler(); + } + }, + OPTIMIZED_BOOTSTRAP("OPTIMIZED_BOOTSTRAP") { + @Override + public MessageHandler handler() { + return new DefaultHandler(); + } + }, EVENT_CREATE_DATABASE("EVENT_CREATE_DATABASE") { @Override public MessageHandler handler() { Index: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java new file mode 100644 --- /dev/null (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/OptimizedBootstrapDumpMetricCollector.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.metric; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + + +public class OptimizedBootstrapDumpMetricCollector extends ReplicationMetricCollector { + public OptimizedBootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) { + super(dbName, Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, stagingDir, 0, conf); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java new file mode 100644 --- /dev/null (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/metric/PreOptimizedBootstrapDumpMetricCollector.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.metric; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + + +public class PreOptimizedBootstrapDumpMetricCollector extends ReplicationMetricCollector { + public PreOptimizedBootstrapDumpMetricCollector(String dbName, String stagingDir, HiveConf conf) { + super(dbName, Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, stagingDir, 0, conf); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -193,11 +194,33 @@ return DUMP_METADATA; } + public boolean isBootstrapDump() throws SemanticException { + initializeIfNot(); + return (this.dumpType == DumpType.BOOTSTRAP); + } + public boolean isIncrementalDump() throws SemanticException { initializeIfNot(); return (this.dumpType == DumpType.INCREMENTAL); } + public boolean isPreOptimizedBootstrapDump() throws SemanticException { + initializeIfNot(); + return (this.dumpType == DumpType.PRE_OPTIMIZED_BOOTSTRAP); + } + + public boolean isOptimizedBootstrapDump() throws SemanticException { + initializeIfNot(); + return (this.dumpType == DumpType.OPTIMIZED_BOOTSTRAP); + } + + public void setOptimizedBootstrapToDumpMetadataFile() throws SemanticException { + + assert (this.getDumpType() == DumpType.PRE_OPTIMIZED_BOOTSTRAP); + this.setDump(DumpType.OPTIMIZED_BOOTSTRAP, -1L, -1L, null, -1L, false); + this.write(true); + } + private void initializeIfNot() throws SemanticException { if (!initialized) { loadDumpFromFile(); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java new file mode 100644 --- /dev/null (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/OptimizedBootstrapLoadMetricCollector.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.metric; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + +/** + * BootstrapLoadMetricCollector. + * Bootstrap Load Metric Collector + */ +public class OptimizedBootstrapLoadMetricCollector extends ReplicationMetricCollector { + public OptimizedBootstrapLoadMetricCollector(String dbName, String stagingDir, long dumpExecutionId, HiveConf conf) { + super(dbName, Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, stagingDir, dumpExecutionId, conf); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java new file mode 100644 --- /dev/null (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/metric/PreOptimizedBootstrapLoadMetricCollector.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.metric; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; + + +public class PreOptimizedBootstrapLoadMetricCollector extends ReplicationMetricCollector { + public PreOptimizedBootstrapLoadMetricCollector(String dbName, String stagingDir, long dumpExecutionId, HiveConf conf) { + super(dbName, Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, stagingDir, dumpExecutionId, conf); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -26,7 +26,9 @@ */ public enum ReplicationType { BOOTSTRAP, - INCREMENTAL + INCREMENTAL, + PRE_OPTIMIZED_BOOTSTRAP, + OPTIMIZED_BOOTSTRAP } private String dbName; private ReplicationType replicationType; Index: ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java (revision 2031af314e70f3b8e07add13cb65416c29956181) +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java (revision 3c8b191993d14b8d2f4f45f77a314199f1f4a8c8) @@ -29,9 +29,11 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.metric.BootstrapDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.dump.metric.IncrementalDumpMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.metric.OptimizedBootstrapDumpMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.load.metric.IncrementalLoadMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.load.metric.PreOptimizedBootstrapLoadMetricCollector; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata; @@ -230,6 +232,89 @@ checkSuccess(actualMetrics.get(0), expectedMetric, "dump", Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name())); } + + @Test + public void testSuccessPreOptimizedBootstrapDumpMetrics() throws Exception { + ReplicationMetricCollector preOptimizedBootstrapDumpMetricCollector = new PreOptimizedBootstrapLoadMetricCollector("db", + "dummyDir",-1, conf); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 0); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 0); + preOptimizedBootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + preOptimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 0); + List actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + preOptimizedBootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, -1, new SnapshotUtils.ReplSnapshotCount(), + new ReplStatsTracker(0)); + preOptimizedBootstrapDumpMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.PRE_OPTIMIZED_BOOTSTRAP, "dummyDir"); + expectedMetadata.setLastReplId(-1); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 0); + expectedTableMetric.setCurrentCount(0); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 0); + expectedFuncMetric.setCurrentCount(0); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", -1, expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name())); + } + + + + + @Test + public void testSuccessOptimizedBootstrapDumpMetrics() throws Exception { + ReplicationMetricCollector optimizedBootstrapDumpMetricCollector = new OptimizedBootstrapDumpMetricCollector("db", + "dummyDir", conf); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + optimizedBootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 1); + List actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.TABLES.name(), 2); + optimizedBootstrapDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.FUNCTIONS.name(), 1); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + optimizedBootstrapDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10, new SnapshotUtils.ReplSnapshotCount(), + new ReplStatsTracker(0)); + optimizedBootstrapDumpMetricCollector.reportEnd(Status.SUCCESS); + actualMetrics = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, actualMetrics.size()); + + Metadata expectedMetadata = new Metadata("db", Metadata.ReplicationType.OPTIMIZED_BOOTSTRAP, "dummyDir"); + expectedMetadata.setLastReplId(10); + Progress expectedProgress = new Progress(); + expectedProgress.setStatus(Status.SUCCESS); + Stage dumpStage = new Stage("dump", Status.SUCCESS, 0); + dumpStage.setEndTime(0); + Metric expectedTableMetric = new Metric(ReplUtils.MetricName.TABLES.name(), 10); + expectedTableMetric.setCurrentCount(3); + Metric expectedFuncMetric = new Metric(ReplUtils.MetricName.FUNCTIONS.name(), 1); + expectedFuncMetric.setCurrentCount(1); + dumpStage.addMetric(expectedTableMetric); + dumpStage.addMetric(expectedFuncMetric); + expectedProgress.addStage(dumpStage); + ReplicationMetric expectedMetric = new ReplicationMetric(1, "repl", 0, + expectedMetadata); + expectedMetric.setProgress(expectedProgress); + checkSuccess(actualMetrics.get(0), expectedMetric, "dump", + Arrays.asList(ReplUtils.MetricName.TABLES.name(), ReplUtils.MetricName.FUNCTIONS.name())); + } @Test public void testFailoverReadyDumpMetrics() throws Exception {