diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 8c78da9c93..d454a99b90 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -620,7 +620,8 @@ "from some other path : {1}.", true), REPL_INVALID_CONFIG_FOR_SERVICE(40008, "Invalid config error : {0} for {1} service.", true), REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE(40009, "Invalid internal config error : {0} for {1} service.", true), - REPL_RETRY_EXHAUSTED(40010, "Retry exhausted for retryable error code {0}.", true) + REPL_RETRY_EXHAUSTED(40010, "Retry exhausted for retryable error code {0}.", true), + REPL_FAILED_WITH_NON_RECOVERABLE_ERROR(40011, "Replication failed with non recoverable error. Needs manual intervention") ; private int errorCode; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index d735c9bd20..1d3d0593ae 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -45,10 +46,7 @@ import org.junit.Test; import javax.annotation.Nullable; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; @@ -65,6 +63,7 @@ import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.TARGET_OF_REPLICATION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -1608,6 +1607,8 @@ public void testRangerReplicationRetryExhausted() throws Throwable { "'" + HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY + "'='1s'", "'" + HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION + "'='30s'", "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL + "'='false'", "'" + HiveConf.ConfVars.HIVE_IN_TEST + "'='false'"); + List testClause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'", + "'hive.in.test'='true'"); try { primary.run("use " + primaryDbName) .run("create table acid_table (key int, value int) partitioned by (load_date date) " + @@ -1621,6 +1622,21 @@ public void testRangerReplicationRetryExhausted() throws Throwable { Assert.assertEquals(ErrorMsg.REPL_RETRY_EXHAUSTED.getErrorCode(), ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); } + //This is now non recoverable error + try { + primary.dump(primaryDbName, clause); + Assert.fail(); + } catch (Exception e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + //Delete non recoverable marker to fix this + Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); + //This should pass as non recoverable marker removed and valid configs present. + primary.dump(primaryDbName, testClause); } /* @@ -1639,12 +1655,68 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable { .run("insert into table1 values (2)"); try { primary.dump(primaryDbName, clause); + Assert.fail(); } catch (SemanticException e) { assertEquals("Invalid config error : Authorizer sentry not supported for replication " + "for ranger service.", e.getMessage()); assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(), ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); } + //This is now non recoverable error + try { + primary.dump(primaryDbName, clause); + Assert.fail(); + } catch (Exception e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + //Delete non recoverable marker to fix this + Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); + //This should pass as non recoverable marker removed and valid configs present. + WarehouseInstance.Tuple dump = primary.dump(primaryDbName); + + try { + replica.load(replicatedDbName, primaryDbName, clause); + } catch (Exception e) { + Assert.assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + //This is now non recoverable error + try { + replica.load(replicatedDbName, primaryDbName, clause); + Assert.fail(); + } catch (Exception e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + //Delete non recoverable marker to fix this + nonRecoverablePath = new Path(dump.dumpLocation, NON_RECOVERABLE_MARKER.toString()); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); + //This should pass now + replica.load(replicatedDbName, primaryDbName) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"acid_table", "table1"}) + .run("select * from table1") + .verifyResults(new String[] {"1", "2"}); + } + + private Path getNonRecoverablePath(Path dumpDir, String dbName) throws IOException { + Path dumpPath = new Path(dumpDir, + Base64.getEncoder().encodeToString(dbName.toLowerCase() + .getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = dumpPath.getFileSystem(conf); + if (fs.exists(dumpPath)) { + FileStatus[] statuses = fs.listStatus(dumpPath); + if (statuses.length > 0) { + return new Path(statuses[0].getPath(), NON_RECOVERABLE_MARKER.toString()); + } + } + return null; } //Testing just the configs and no impact on existing replication @@ -1681,14 +1753,38 @@ public void testAtlasMissingConfigs() throws Throwable { confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true); + ensureFailedAdminRepl(getAtlasClause(confMap), true); + //Delete non recoverable marker to fix this + Path baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + Path nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas"); ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true); confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, true); + ensureFailedAdminRepl(getAtlasClause(confMap), true); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, true); + ensureFailedAdminRepl(getAtlasClause(confMap), true); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, true); + ensureFailedAdminRepl(getAtlasClause(confMap), true); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); primary.dump(primaryDbName, getAtlasClause(confMap)); verifyAtlasMetadataPresent(); @@ -1697,16 +1793,48 @@ public void testAtlasMissingConfigs() throws Throwable { confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false); + ensureFailedAdminRepl(getAtlasClause(confMap), false); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas"); ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false); confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, false); + ensureFailedAdminRepl(getAtlasClause(confMap), false); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, false); + ensureFailedAdminRepl(getAtlasClause(confMap), false); + //Delete non recoverable marker to fix this + baseDumpDir = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR)); + nonRecoverablePath = getNonRecoverablePath(baseDumpDir, primaryDbName); + Assert.assertNotNull(nonRecoverablePath); + baseDumpDir.getFileSystem(primary.hiveConf).delete(nonRecoverablePath, true); confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap)); } + private void ensureFailedAdminRepl(List clause, boolean dump) throws Throwable { + try { + if (dump) { + primary.dump(primaryDbName, clause); + } else { + primary.load(replicatedDbName, primaryDbName, clause); + } + Assert.fail(); + } catch (SemanticException e) { + assertEquals(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(), + ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode()); + } + } + private void ensureInvalidUrl(List atlasClause, String endpoint, boolean dump) throws Throwable { try { if (dump) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index 8020fa486e..f56ca3dde8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -107,12 +107,21 @@ public int execute() { } catch (Exception e) { LOG.error("Exception while dumping atlas metadata", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + Path nonRecoverableMarker = new Path(work.getStagingDir().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index 1506db8211..9ae4a732e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -90,12 +91,21 @@ public int execute() { } catch (Exception e) { LOG.error("Exception while loading atlas metadata", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + Path nonRecoverableMarker = new Path(work.getStagingDir().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java index 0576ecc5c1..29dcbbb233 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -136,12 +137,21 @@ public int execute() { } catch (Exception e) { LOG.error("failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + Path nonRecoverableMarker = new Path(work.getCurrentDumpPath().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java index 240caf3981..d09aaac04c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.log.RangerLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -154,12 +155,21 @@ public int execute() { } catch (Exception e) { LOG.error("Failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + Path nonRecoverableMarker = new Path(work.getCurrentDumpPath().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { - LOG.error("Failed to collect Metrics", ex); + LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java index d9c873b332..88127a3e3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -23,7 +23,8 @@ public enum ReplAck { DUMP_ACKNOWLEDGEMENT("_finished_dump"), EVENTS_DUMP("_events_dump"), - LOAD_ACKNOWLEDGEMENT("_finished_load"); + LOAD_ACKNOWLEDGEMENT("_finished_load"), + NON_RECOVERABLE_MARKER("_non_recoverable"); private String ack; ReplAck(String ack) { this.ack = ack; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 214c12d015..ac1ee0ebef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -156,6 +156,11 @@ public int execute() { initiateDataCopyTasks(); } else { Path dumpRoot = getEncodedDumpRootPath(); + if (ReplUtils.failedWithNonRecoverableError(getLatestDumpPath(dumpRoot), conf)) { + LOG.error("Previous dump failed with non recoverable error. Needs manual intervention. "); + setException(new SemanticException(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.format())); + return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(); + } Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); boolean isBootstrap = (previousValidHiveDumpPath == null); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. @@ -192,12 +197,20 @@ public int execute() { } catch (Exception e) { LOG.error("failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + Path nonRecoverableMarker = new Path(work.getCurrentDumpPath(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } return 0; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 8029b72294..87419fb330 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -62,10 +62,12 @@ import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -107,6 +109,11 @@ public StageType getType() { public int execute() { try { SecurityUtils.reloginExpiringKeytabUser(); + if (ReplUtils.failedWithNonRecoverableError(new Path(work.dumpDirectory).getParent(), conf)) { + LOG.error("Previous load failed with non recoverable error. Needs manual intervention. "); + setException(new SemanticException(ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.format())); + return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(); + } Task rootTask = work.getRootTask(); if (rootTask != null) { rootTask.setChildTasks(null); @@ -136,12 +143,20 @@ public int execute() { } catch (Exception e) { LOG.error("replication failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportEnd(Status.FAILED); + if (errorCode > 40000) { + Path nonRecoverableMarker = new Path(new Path(work.dumpDirectory).getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString()); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 137cc29121..9c3eea7097 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.repl.util; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplConst; @@ -35,7 +37,9 @@ import org.apache.hadoop.hive.ql.ddl.table.partition.PartitionUtils; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplAck; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -61,6 +65,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.TableMigrationOption.MANAGED; public class ReplUtils { @@ -366,4 +371,26 @@ public static boolean includeAcidTableInDump(HiveConf conf) { public static boolean tableIncludedInReplScope(ReplScope replScope, String tableName) { return ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); } + + public static boolean failedWithNonRecoverableError(Path dumpRoot, HiveConf conf) throws SemanticException { + if (dumpRoot == null) { + return false; + } + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + try { + return retryable.executeCallable(() -> { + FileSystem fs = dumpRoot.getFileSystem(conf); + if (fs.exists(dumpRoot)) { + if (fs.exists(new Path(dumpRoot, NON_RECOVERABLE_MARKER.toString()))) { + return true; + } + } + return false; + }); + } catch (Exception e) { + throw new SemanticException(e); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 3ad5b2d595..636c7bbced 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -45,10 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -130,6 +126,22 @@ public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is, } } + public static void writeStackTrace(Exception e, Path outputFile, HiveConf conf) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).withFailOnException(FileNotFoundException.class).build(); + try { + retryable.executeCallable((Callable) () -> { + PrintWriter pw = new PrintWriter(outputFile.getFileSystem(conf).create(outputFile)); + e.printStackTrace(pw); + pw.close(); + return null; + }); + } catch (Exception ex) { + throw new SemanticException(ex); + } + } + public static void writeOutput(String content, Path outputFile, HiveConf hiveConf) throws SemanticException { Retryable retryable = Retryable.builder() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java index ba19e28a24..59bc626084 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java @@ -77,29 +77,45 @@ public void reportStageEnd(String stageName, Status status, long lastReplId) thr Stage stage = progress.getStageByName(stageName); stage.setStatus(status); stage.setEndTime(System.currentTimeMillis()); - if (Status.FAILED == status) { - progress.setStatus(Status.FAILED); - } replicationMetric.setProgress(progress); Metadata metadata = replicationMetric.getMetadata(); metadata.setLastReplId(lastReplId); replicationMetric.setMetadata(metadata); metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); + } } } - public void reportStageEnd(String stageName, Status status) throws SemanticException { + public void reportStageEnd(String stageName, Status status, String errorLogPath) throws SemanticException { if (isEnabled) { LOG.debug("Stage Ended {}, {}", stageName, status ); Progress progress = replicationMetric.getProgress(); Stage stage = progress.getStageByName(stageName); stage.setStatus(status); stage.setEndTime(System.currentTimeMillis()); - if (Status.FAILED == status) { - progress.setStatus(Status.FAILED); + stage.setErrorLogPath(errorLogPath); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); } + } + } + + public void reportStageEnd(String stageName, Status status) throws SemanticException { + if (isEnabled) { + LOG.debug("Stage Ended {}, {}", stageName, status ); + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + stage.setStatus(status); + stage.setEndTime(System.currentTimeMillis()); replicationMetric.setProgress(progress); metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java index 2f588eaf4e..d509c25b05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java @@ -31,6 +31,7 @@ private long startTime; private long endTime; private Map metrics = new HashMap<>(); + private String errorLogPath; public Stage() { @@ -50,6 +51,7 @@ public Stage(Stage stage) { for (Metric metric : stage.metrics.values()) { this.metrics.put(metric.getName(), new Metric(metric)); } + this.errorLogPath = stage.errorLogPath; } public String getName() { @@ -96,4 +98,12 @@ public Metric getMetricByName(String name) { public List getMetrics() { return new ArrayList<>(metrics.values()); } + + public String getErrorLogPath() { + return errorLogPath; + } + + public void setErrorLogPath(String errorLogPath) { + this.errorLogPath = errorLogPath; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java index 22b6236072..ebc470313f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java @@ -38,6 +38,8 @@ private List metrics = new ArrayList<>(); + private String errorLogPath; + public StageMapper() { } @@ -61,4 +63,7 @@ public long getEndTime() { return metrics; } + public String getErrorLogPath() { + return errorLogPath; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java index 96cf565f76..e774c9d703 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java @@ -24,5 +24,6 @@ public enum Status { SUCCESS, FAILED, - IN_PROGRESS + IN_PROGRESS, + FAILED_ADMIN } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java index c51618b236..d8141d4db4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java @@ -79,6 +79,7 @@ public void setup() throws Exception { @Test public void testFailureInvalidAuthProviderEndpoint() { + Mockito.when(work.getCurrentDumpPath()).thenReturn(new Path("dumppath")); int status = task.execute(); Assert.assertEquals(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.getErrorCode(), status); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java index 625a6e161d..e17ca83c5f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java @@ -317,4 +317,21 @@ public void testSuccessStageFailure() throws Exception { ReplicationMetric actualMetric = metricList.get(0); Assert.assertEquals(Status.FAILED, actualMetric.getProgress().getStatus()); } + + @Test + public void testSuccessStageFailedAdmin() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", conf); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.FAILED_ADMIN, "errorlogpath"); + List metricList = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, metricList.size()); + ReplicationMetric actualMetric = metricList.get(0); + Assert.assertEquals(Status.FAILED_ADMIN, actualMetric.getProgress().getStatus()); + Assert.assertEquals("errorlogpath", actualMetric.getProgress() + .getStageByName("dump").getErrorLogPath()); + } }