diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index d94341263d..909393fb83 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -616,7 +616,8 @@ SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true), SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true), - REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication.") + REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication."), + REPL_FAILED_WITH_NON_RECOVERABLE_ERROR(40009, "Replication failed with non recoverable error. Needs manual intervention") ; private int errorCode; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index e6384af5dd..06c672751a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -42,11 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Serializable; +import java.io.*; import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; @@ -105,12 +101,21 @@ public int execute() { } catch (Exception e) { LOG.error("Exception while dumping atlas metadata", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + String nonRecoverableMarker = new Path(work.getStagingDir().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()).toString(); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index b24b3d62d7..3fd18469c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -88,12 +89,21 @@ public int execute() { } catch (Exception e) { LOG.error("Exception while loading atlas metadata", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + String nonRecoverableMarker = new Path(work.getStagingDir().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()).toString(); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java index 4c1a1708a7..fa12f718a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -128,12 +129,21 @@ public int execute() { } catch (Exception e) { LOG.error("failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + String nonRecoverableMarker = new Path(work.getCurrentDumpPath().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()).toString(); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java index 3c7e9e2463..589174d1e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.log.RangerLoadLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -146,12 +147,21 @@ public int execute() { } catch (Exception e) { LOG.error("Failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + //Create non recoverable marker at top level + String nonRecoverableMarker = new Path(work.getCurrentDumpPath().getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()).toString(); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { - LOG.error("Failed to collect Metrics", ex); + LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java index d9c873b332..88127a3e3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java @@ -23,7 +23,8 @@ public enum ReplAck { DUMP_ACKNOWLEDGEMENT("_finished_dump"), EVENTS_DUMP("_events_dump"), - LOAD_ACKNOWLEDGEMENT("_finished_load"); + LOAD_ACKNOWLEDGEMENT("_finished_load"), + NON_RECOVERABLE_MARKER("_non_recoverable"); private String ack; ReplAck(String ack) { this.ack = ack; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index dc8d7903fc..656c0a82e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -87,7 +87,6 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.io.IOUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,6 +116,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER; @@ -160,6 +160,10 @@ public int execute() { initiateDataCopyTasks(); } else { Path dumpRoot = getEncodedDumpRootPath(); + if (ReplUtils.failedWithNonRecoverableError(dumpRoot, conf)) { + LOG.error("Previous dump failed with non recoverable error. Needs manual intervention. "); + return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(); + } Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot); boolean isBootstrap = (previousValidHiveDumpPath == null); //If no previous dump is present or previous dump is already loaded, proceed with the dump operation. @@ -196,12 +200,20 @@ public int execute() { } catch (Exception e) { LOG.error("failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + if (errorCode > 40000) { + String nonRecoverableMarker = new Path(work.getCurrentDumpPath(), + ReplAck.NON_RECOVERABLE_MARKER.toString()).toString(); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } return 0; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 3c3dc444db..e47e803485 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -62,10 +62,12 @@ import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.api.StageType; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -107,6 +109,10 @@ public StageType getType() { public int execute() { try { SecurityUtils.reloginExpiringKeytabUser(); + if (ReplUtils.failedWithNonRecoverableError(new Path(work.dumpDirectory).getParent(), conf)) { + LOG.error("Previous load failed with non recoverable error. Needs manual intervention. "); + return ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode(); + } Task rootTask = work.getRootTask(); if (rootTask != null) { rootTask.setChildTasks(null); @@ -136,12 +142,20 @@ public int execute() { } catch (Exception e) { LOG.error("replication failed", e); setException(e); + int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); try { - work.getMetricCollector().reportEnd(Status.FAILED); + if (errorCode > 40000) { + String nonRecoverableMarker = new Path(new Path(work.dumpDirectory).getParent(), + ReplAck.NON_RECOVERABLE_MARKER.toString()).toString(); + Utils.writeStackTrace(e, nonRecoverableMarker, conf); + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker); + } else { + work.getMetricCollector().reportStageEnd(getName(), Status.FAILED); + } } catch (SemanticException ex) { LOG.error("Failed to collect Metrics ", ex); } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + return errorCode; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index d531f5bff2..63d61fd204 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.util; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.common.repl.ReplConst; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -61,6 +63,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER; import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.TableMigrationOption.MANAGED; public class ReplUtils { @@ -356,4 +359,24 @@ public static boolean includeAcidTableInDump(HiveConf conf) { public static boolean tableIncludedInReplScope(ReplScope replScope, String tableName) { return ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); } + + public static boolean failedWithNonRecoverableError(Path dumpRoot, HiveConf conf) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + try { + return retryable.executeCallable(() -> { + FileSystem fs = dumpRoot.getFileSystem(conf); + Path nonRecoverableErrorPath = new Path(dumpRoot, NON_RECOVERABLE_MARKER.toString()); + if (fs.exists(dumpRoot)) { + if (fs.exists(nonRecoverableErrorPath)) { + return true; + } + } + return false; + }); + } catch (Exception e) { + throw new SemanticException(e); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 3ad5b2d595..98f0ff3d9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -45,10 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -130,6 +127,22 @@ public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is, } } + public static void writeStackTrace(Exception e, String outputFile, HiveConf conf) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + try { + retryable.executeCallable((Callable) () -> { + PrintWriter pw = new PrintWriter(new File(outputFile)); + e.printStackTrace(pw); + pw.close(); + return null; + }); + } catch (Exception ex) { + throw new SemanticException(ex); + } + } + public static void writeOutput(String content, Path outputFile, HiveConf hiveConf) throws SemanticException { Retryable retryable = Retryable.builder() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java index ba19e28a24..59bc626084 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java @@ -77,29 +77,45 @@ public void reportStageEnd(String stageName, Status status, long lastReplId) thr Stage stage = progress.getStageByName(stageName); stage.setStatus(status); stage.setEndTime(System.currentTimeMillis()); - if (Status.FAILED == status) { - progress.setStatus(Status.FAILED); - } replicationMetric.setProgress(progress); Metadata metadata = replicationMetric.getMetadata(); metadata.setLastReplId(lastReplId); replicationMetric.setMetadata(metadata); metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); + } } } - public void reportStageEnd(String stageName, Status status) throws SemanticException { + public void reportStageEnd(String stageName, Status status, String errorLogPath) throws SemanticException { if (isEnabled) { LOG.debug("Stage Ended {}, {}", stageName, status ); Progress progress = replicationMetric.getProgress(); Stage stage = progress.getStageByName(stageName); stage.setStatus(status); stage.setEndTime(System.currentTimeMillis()); - if (Status.FAILED == status) { - progress.setStatus(Status.FAILED); + stage.setErrorLogPath(errorLogPath); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); } + } + } + + public void reportStageEnd(String stageName, Status status) throws SemanticException { + if (isEnabled) { + LOG.debug("Stage Ended {}, {}", stageName, status ); + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName(stageName); + stage.setStatus(status); + stage.setEndTime(System.currentTimeMillis()); replicationMetric.setProgress(progress); metricCollector.addMetric(replicationMetric); + if (Status.FAILED == status || Status.FAILED_ADMIN == status) { + reportEnd(status); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java index 2f588eaf4e..d509c25b05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java @@ -31,6 +31,7 @@ private long startTime; private long endTime; private Map metrics = new HashMap<>(); + private String errorLogPath; public Stage() { @@ -50,6 +51,7 @@ public Stage(Stage stage) { for (Metric metric : stage.metrics.values()) { this.metrics.put(metric.getName(), new Metric(metric)); } + this.errorLogPath = stage.errorLogPath; } public String getName() { @@ -96,4 +98,12 @@ public Metric getMetricByName(String name) { public List getMetrics() { return new ArrayList<>(metrics.values()); } + + public String getErrorLogPath() { + return errorLogPath; + } + + public void setErrorLogPath(String errorLogPath) { + this.errorLogPath = errorLogPath; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java index 22b6236072..ebc470313f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java @@ -38,6 +38,8 @@ private List metrics = new ArrayList<>(); + private String errorLogPath; + public StageMapper() { } @@ -61,4 +63,7 @@ public long getEndTime() { return metrics; } + public String getErrorLogPath() { + return errorLogPath; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java index 96cf565f76..e774c9d703 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Status.java @@ -24,5 +24,6 @@ public enum Status { SUCCESS, FAILED, - IN_PROGRESS + IN_PROGRESS, + FAILED_ADMIN }