diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 8c78da9c93..d454a99b90 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -620,7 +620,8 @@ "from some other path : {1}.", true), REPL_INVALID_CONFIG_FOR_SERVICE(40008, "Invalid config error : {0} for {1} service.", true), REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE(40009, "Invalid internal config error : {0} for {1} service.", true), - REPL_RETRY_EXHAUSTED(40010, "Retry exhausted for retryable error code {0}.", true) + REPL_RETRY_EXHAUSTED(40010, "Retry exhausted for retryable error code {0}.", true), + REPL_FAILED_WITH_NON_RECOVERABLE_ERROR(40011, "Replication failed with non recoverable error. Needs manual intervention") ; private int errorCode; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index d735c9bd20..f1e233a539 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -19,6 +19,8 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,20 +37,17 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.util.DependencyResolver; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import javax.annotation.Nullable; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; @@ -65,6 +64,7 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.TARGET_OF_REPLICATION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -1608,6 +1608,8 @@ public void testRangerReplicationRetryExhausted() throws Throwable { "'" + HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY + "'='1s'", "'" + HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION + "'='30s'", "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL + "'='false'", "'" + HiveConf.ConfVars.HIVE_IN_TEST + "'='false'"); + List testClause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'", + "'hive.in.test'='true'"); try { primary.run("use " + primaryDbName) .run("create table acid_table (key int, value int) partitioned by (load_date date) " + @@ -1621,6 +1623,21 @@ public void testRangerReplicationRetryExhausted() throws Throwable { Assert.assertEquals(ErrorMsg.REPL_RETRY_EXHAUSTED.getErrorCode(), ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); } + //This is now non recoverable error + try { + primary.dump(primaryDbName, clause); + Assert.fail(); + } catch (Exception e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + //Delete non recoverable marker to fix this + Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); + //This should pass as non recoverable marker removed and valid configs present. + primary.dump(primaryDbName, testClause); } /* @@ -1639,12 +1656,110 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable { .run("insert into table1 values (2)"); try { primary.dump(primaryDbName, clause); + Assert.fail(); } catch (SemanticException e) { assertEquals("Invalid config error : Authorizer sentry not supported for replication " + "for ranger service.", e.getMessage()); assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(), ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); } + //This is now non recoverable error + try { + primary.dump(primaryDbName, clause); + Assert.fail(); + } catch (Exception e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + try { + replica.load(replicatedDbName, primaryDbName, clause); + Assert.fail(); + } catch (Exception e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + //Delete non recoverable marker to fix this + Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); + Assert.assertFalse(baseDumpDir.getFileSystem(primary.hiveConf).exists(nonRecoverablePath)); + //This should pass as non recoverable marker removed and valid configs present. + WarehouseInstance.Tuple dump = primary.dump(primaryDbName); + String stackTrace = null; + try { + replica.load(replicatedDbName, primaryDbName, clause); + } catch (Exception e) { + Assert.assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + stackTrace = ExceptionUtils.getStackTrace(e); + } + //This is now non recoverable error + try { + replica.load(replicatedDbName, primaryDbName, clause); + Assert.fail(); + } catch (Exception e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + try { + replica.dump(primaryDbName, clause); + Assert.fail(); + } catch (Exception e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + nonRecoverablePath = new Path(dump.dumpLocation, NON_RECOVERABLE_MARKER.toString()); + Assert.assertNotNull(nonRecoverablePath); + //check non recoverable stack trace + String actualStackTrace = readStackTrace(nonRecoverablePath, primary.hiveConf); + Assert.assertEquals(stackTrace, actualStackTrace); + //Delete non recoverable marker to fix this + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); + //This should pass now + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"acid_table", "table1"}) + .run("select * from table1") + .verifyResults(new String[] {"1", "2"}); + } + + private String readStackTrace(Path nonRecoverablePath, HiveConf conf) { + try { + FileSystem fs = FileSystem.get(conf); + FSDataInputStream in = fs.open(nonRecoverablePath); + BufferedReader bufferedReader = new BufferedReader( + new InputStreamReader(in, StandardCharsets.UTF_8)); + String line = null; + StringBuilder builder = new StringBuilder(); + while ((line=bufferedReader.readLine())!=null){ + builder.append(line); + builder.append("\n"); + } + try { + in.close(); + } catch (IOException e) { + //Ignore + } + return builder.toString(); + } catch (IOException e) { + return null; + } + } + + private Path getNonRecoverablePath(Path dumpDir, String dbName) throws IOException { + Path dumpPath = new Path(dumpDir, + Base64.getEncoder().encodeToString(dbName.toLowerCase() + .getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = dumpPath.getFileSystem(conf); + if (fs.exists(dumpPath)) { + FileStatus[] statuses = fs.listStatus(dumpPath); + if (statuses.length > 0) { + return new Path(statuses[0].getPath(), NON_RECOVERABLE_MARKER.toString()); + } + } + return null; } //Testing just the configs and no impact on existing replication @@ -1681,14 +1796,38 @@ public void testAtlasMissingConfigs() throws Throwable { confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true); + ensureFailedAdminRepl(getAtlasClause(confMap), true); + //Delete non recoverable marker to fix this + Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas"); ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true); confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, true); + ensureFailedAdminRepl(getAtlasClause(confMap), true); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, true); + ensureFailedAdminRepl(getAtlasClause(confMap), true); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, true); + ensureFailedAdminRepl(getAtlasClause(confMap), true); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); primary.dump(primaryDbName, getAtlasClause(confMap)); verifyAtlasMetadataPresent(); @@ -1697,16 +1836,48 @@ public void testAtlasMissingConfigs() throws Throwable { confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false); + ensureFailedAdminRepl(getAtlasClause(confMap), false); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas"); ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false); confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, false); + ensureFailedAdminRepl(getAtlasClause(confMap), false); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, false); + ensureFailedAdminRepl(getAtlasClause(confMap), false); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap)); } + private void ensureFailedAdminRepl(List clause, boolean dump) throws Throwable { + try { + if (dump) { + primary.dump(primaryDbName, clause); + } else { + primary.load(replicatedDbName, primaryDbName, clause); + } + Assert.fail(); + } catch (SemanticException e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + } + private void ensureInvalidUrl(List atlasClause, String endpoint, boolean dump) throws Throwable { try { if (dump) { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index a5678f26e8..cc5f1dfb8e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; @@ -47,13 +48,8 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.nio.charset.StandardCharsets; +import java.util.*; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -1124,8 +1120,24 @@ public void testExternalTableBaseDirMandatory() throws Throwable { withClause = ReplicationTestUtils.includeExternalTableClause(true); withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"+ fullyQualifiedReplicaExternalBase +"'"); + try { + primary.run("use " + primaryDbName) + .dump(primaryDbName, withClause); + } catch (Exception e) { + Assert.assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + //delete non recoverable marker + Path dumpPath = new Path(primary.hiveConf.get(HiveConf.ConfVars.REPLDIR.varname), + Base64.getEncoder().encodeToString(primaryDbName.toLowerCase() + .getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = dumpPath.getFileSystem(conf); + Path nonRecoverableMarker = new Path(fs.listStatus(dumpPath)[0].getPath(), ReplAck.NON_RECOVERABLE_MARKER + .toString()); + fs.delete(nonRecoverableMarker, true); + tuple = primary.run("use " + primaryDbName) - .dump(primaryDbName, withClause); + .dump(primaryDbName, withClause); withClause = ReplicationTestUtils.includeExternalTableClause(true); withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'=''"); @@ -1140,6 +1152,18 @@ public void testExternalTableBaseDirMandatory() throws Throwable { withClause.add("'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"+ fullyQualifiedReplicaExternalBase +"'"); + try { + replica.load(replicatedDbName, primaryDbName, withClause); + } catch (Exception e) { + Assert.assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + + //delete non recoverable marker + nonRecoverableMarker = new Path(tuple.dumpLocation, ReplAck.NON_RECOVERABLE_MARKER + .toString()); + fs.delete(nonRecoverableMarker, true); + replica.load(replicatedDbName, primaryDbName, withClause); replica.run("repl status " + replicatedDbName) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index 8020fa486e..f56ca3dde8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -107,12 +107,21 @@ public int execute() { } catch (Exception e) { LOG.error("Exception while dumping atlas metadata", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + Path nonRecoverableMarker = new Path(work.getStagingDir().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index 1506db8211..9ae4a732e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -90,12 +91,21 @@ public int execute() { } catch (Exception e) { LOG.error("Exception while loading atlas metadata", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + Path nonRecoverableMarker = new Path(work.getStagingDir().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java index 0576ecc5c1..29dcbbb233 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -136,12 +137,21 @@ public int execute() { } catch (Exception e) { LOG.error("failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + Path nonRecoverableMarker = new Path(work.getCurrentDumpPath().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java index 240caf3981..d09aaac04c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.log.RangerLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -154,12 +155,21 @@ public int execute() { } catch (Exception e) { LOG.error("Failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + Path nonRecoverableMarker = new Path(work.getCurrentDumpPath().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { - LOG.error("Failed to collect Metrics", ex); + LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java index d9c873b332..88127a3e3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -23,7 +23,8 @@ public enum ReplAck { DUMP_ACKNOWLEDGEMENT("_finished_dump"), EVENTS_DUMP("_events_dump"), - LOAD_ACKNOWLEDGEMENT("_finished_load"); + LOAD_ACKNOWLEDGEMENT("_finished_load"), + NON_RECOVERABLE_MARKER("_non_recoverable"); private String ack; ReplAck(String ack) { this.ack = ack; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 214c12d015..0335e67bb2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -155,7 +155,12 @@ public int execute() { if (work.dataCopyIteratorsInitialized()) { initiateDataCopyTasks(); } else { - Path dumpRoot = getEncodedDumpRootPath(); + Path dumpRoot = ReplUtils.getEncodedDumpRootPath(conf, work.dbNameOrPattern.toLowerCase()); + if (ReplUtils.failedWithNonRecoverableError(ReplUtils.getLatestDumpPath(dumpRoot, conf), conf)) { + LOG.error("Previous dump failed with non recoverable error. Needs manual intervention. "); + setException(new SemanticException(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.format())); + return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(); + } Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); boolean isBootstrap = (previousValidHiveDumpPath == null); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. @@ -192,12 +197,20 @@ public int execute() { } catch (Exception e) { LOG.error("failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + Path nonRecoverableMarker = new Path(work.getCurrentDumpPath(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } return 0; } @@ -228,14 +241,8 @@ private boolean shouldDumpAtlasMetadata() { return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA); } - private Path getEncodedDumpRootPath() throws UnsupportedEncodingException { - return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), - Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() - .getBytes(StandardCharsets.UTF_8.name()))); - } - private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOException { - Path lastDumpPath = getLatestDumpPath(dumpRoot); + Path lastDumpPath = ReplUtils.getLatestDumpPath(dumpRoot, conf); if (lastDumpPath != null && shouldResumePreviousDump(lastDumpPath, isBootstrap)) { //Resume previous dump LOG.info("Resuming the dump with existing dump directory {}", lastDumpPath); @@ -1087,24 +1094,6 @@ private String getNextDumpDir() { } } - private Path getLatestDumpPath(Path dumpRoot) throws IOException { - FileSystem fs = dumpRoot.getFileSystem(conf); - if (fs.exists(dumpRoot)) { - FileStatus[] statuses = fs.listStatus(dumpRoot); - if (statuses.length > 0) { - FileStatus latestValidStatus = statuses[0]; - for (FileStatus status : statuses) { - LOG.info("Evaluating previous dump dir path:{}", status.getPath()); - if (status.getModificationTime() > latestValidStatus.getModificationTime()) { - latestValidStatus = status; - } - } - return latestValidStatus.getPath(); - } - } - return null; - } - List dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Path dbDataRoot, Hive hiveDb, boolean copyAtLoad) throws Exception { List functionsBinaryCopyPaths = new ArrayList<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 8029b72294..8d479dd771 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -136,12 +137,20 @@ public int execute() { } catch (Exception e) { LOG.error("replication failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportEnd(Status.FAILED); + if (errorCode > 40000) { + Path nonRecoverableMarker = new Path(new Path(work.dumpDirectory).getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 985bc399de..5d71ce03ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.repl.util; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplConst; @@ -36,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -50,14 +53,20 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Base64; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; public class ReplUtils { @@ -90,9 +99,6 @@ // Config for hadoop default file system. public static final String DEFAULT_FS_CONFIG = "fs.defaultFS"; - // Cluster name separator, used when the cluster name contains data center name as well, e.g. dc$mycluster1. - public static final String CLUSTER_NAME_SEPARATOR = "$"; - // Name of the directory which stores the list of tables included in the policy in case of table level replication. // One file per database, named after the db name. The directory is not created for db level replication. @@ -142,6 +148,8 @@ TABLES, FUNCTIONS, EVENTS, POLICIES, ENTITIES } + private static transient Logger LOG = LoggerFactory.getLogger(ReplUtils.class); + public static Map> genPartSpecs( Table table, List> partitions) throws SemanticException { Map> partSpecs = new HashMap<>(); @@ -302,4 +310,48 @@ public static boolean includeAcidTableInDump(HiveConf conf) { public static boolean tableIncludedInReplScope(ReplScope replScope, String tableName) { return ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); } + + public static boolean failedWithNonRecoverableError(Path dumpRoot, HiveConf conf) throws SemanticException { + if (dumpRoot == null) { + return false; + } + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + try { + return retryable.executeCallable(() -> { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(new Path(dumpRoot, NON_RECOVERABLE_MARKER.toString()))) { + return true; + } + return false; + }); + } catch (Exception e) { + throw new SemanticException(e); + } + } + + public static Path getEncodedDumpRootPath(HiveConf conf, String dbname) throws UnsupportedEncodingException { + return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), + Base64.getEncoder().encodeToString(dbname + .getBytes(StandardCharsets.UTF_8.name()))); + } + + public static Path getLatestDumpPath(Path dumpRoot, HiveConf conf) throws IOException { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + FileStatus[] statuses = fs.listStatus(dumpRoot); + if (statuses.length > 0) { + FileStatus latestValidStatus = statuses[0]; + for (FileStatus status : statuses) { + LOG.info("Evaluating previous dump dir path:{}", status.getPath()); + if (status.getModificationTime() > latestValidStatus.getModificationTime()) { + latestValidStatus = status; + } + } + return latestValidStatus.getPath(); + } + } + return null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 5e3f3a52d6..c748281ece 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -409,6 +409,9 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { initMetricCollection(!evDump, loadPath.toString(), replScope.getDbName(), dmd.getDumpExecutionId())); rootTasks.add(TaskFactory.get(replLoadWork, conf)); + } else if (ReplUtils.failedWithNonRecoverableError(ReplUtils.getLatestDumpPath(ReplUtils + .getEncodedDumpRootPath(conf, sourceDbNameOrPattern.toLowerCase()), conf), conf)) { + throw new Exception(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getMsg()); } else { LOG.warn("Previous Dump Already Loaded"); } @@ -430,9 +433,7 @@ private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Str } private Path getCurrentLoadPath() throws IOException, SemanticException { - Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), - Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase() - .getBytes(StandardCharsets.UTF_8.name()))); + Path loadPathBase = ReplUtils.getEncodedDumpRootPath(conf, sourceDbNameOrPattern.toLowerCase()); final FileSystem fs = loadPathBase.getFileSystem(conf); // Make fully qualified path for further use. loadPathBase = fs.makeQualified(loadPathBase); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 3ad5b2d595..c09d7f7311 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -49,6 +48,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.PrintWriter; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -130,6 +130,22 @@ public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is, } } + public static void writeStackTrace(Exception e, Path outputFile, HiveConf conf) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).withFailOnException(FileNotFoundException.class).build(); + try { + retryable.executeCallable((Callable) () -> { + PrintWriter pw = new PrintWriter(outputFile.getFileSystem(conf).create(outputFile)); + e.printStackTrace(pw); + pw.close(); + return null; + }); + } catch (Exception ex) { + throw new SemanticException(ex); + } + } + public static void writeOutput(String content, Path outputFile, HiveConf hiveConf) throws SemanticException { Retryable retryable = Retryable.builder() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java index ba19e28a24..59bc626084 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java @@ -77,29 +77,45 @@ public void reportStageEnd(String stageName, Status status, long lastReplId) thr Stage stage = progress.getStageByName(stageName); stage.setStatus(status); stage.setEndTime(System.currentTimeMillis()); - if (Status.FAILED == status) { - progress.setStatus(Status.FAILED); - } replicationMetric.setProgress(progress); Metadata metadata = replicationMetric.getMetadata(); metadata.setLastReplId(lastReplId); replicationMetric.setMetadata(metadata); metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); + } } } - public void reportStageEnd(String stageName, Status status) throws SemanticException { + public void reportStageEnd(String stageName, Status status, String errorLogPath) throws SemanticException { if (isEnabled) { LOG.debug("Stage Ended {}, {}", stageName, status ); Progress progress = replicationMetric.getProgress(); Stage stage = progress.getStageByName(stageName); stage.setStatus(status); stage.setEndTime(System.currentTimeMillis()); - if (Status.FAILED == status) { - progress.setStatus(Status.FAILED); + stage.setErrorLogPath(errorLogPath); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); } + } + } + + public void reportStageEnd(String stageName, Status status) throws SemanticException { + if (isEnabled) { + LOG.debug("Stage Ended {}, {}", stageName, status ); + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + stage.setStatus(status); + stage.setEndTime(System.currentTimeMillis()); replicationMetric.setProgress(progress); metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java index 2f588eaf4e..d509c25b05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java @@ -31,6 +31,7 @@ private long startTime; private long endTime; private Map metrics = new HashMap<>(); + private String errorLogPath; public Stage() { @@ -50,6 +51,7 @@ public Stage(Stage stage) { for (Metric metric : stage.metrics.values()) { this.metrics.put(metric.getName(), new Metric(metric)); } + this.errorLogPath = stage.errorLogPath; } public String getName() { @@ -96,4 +98,12 @@ public Metric getMetricByName(String name) { public List getMetrics() { return new ArrayList<>(metrics.values()); } + + public String getErrorLogPath() { + return errorLogPath; + } + + public void setErrorLogPath(String errorLogPath) { + this.errorLogPath = errorLogPath; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java index 22b6236072..ebc470313f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java @@ -38,6 +38,8 @@ private List metrics = new ArrayList<>(); + private String errorLogPath; + public StageMapper() { } @@ -61,4 +63,7 @@ public long getEndTime() { return metrics; } + public String getErrorLogPath() { + return errorLogPath; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java index 96cf565f76..e774c9d703 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java @@ -24,5 +24,6 @@ public enum Status { SUCCESS, FAILED, - IN_PROGRESS + IN_PROGRESS, + FAILED_ADMIN } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java index c51618b236..d8141d4db4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java @@ -79,6 +79,7 @@ public void setup() throws Exception { @Test public void testFailureInvalidAuthProviderEndpoint() { + Mockito.when(work.getCurrentDumpPath()).thenReturn(new Path("dumppath")); int status = task.execute(); Assert.assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(), status); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java index 625a6e161d..e17ca83c5f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java @@ -317,4 +317,21 @@ public void testSuccessStageFailure() throws Exception { ReplicationMetric actualMetric = metricList.get(0); Assert.assertEquals(Status.FAILED, actualMetric.getProgress().getStatus()); } + + @Test + public void testSuccessStageFailedAdmin() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", conf); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.FAILED_ADMIN, "errorlogpath"); + List metricList = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, metricList.size()); + ReplicationMetric actualMetric = metricList.get(0); + Assert.assertEquals(Status.FAILED_ADMIN, actualMetric.getProgress().getStatus()); + Assert.assertEquals("errorlogpath", actualMetric.getProgress() + .getStageByName("dump").getErrorLogPath()); + } }