diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 142d779641..24657b164d 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -71,7 +71,6 @@ public final class FileUtils { private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class.getName()); private static final Random random = new Random(); - public static final int MAX_IO_ERROR_RETRY = 5; public static final int IO_ERROR_SLEEP_TIME = 100; public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() { diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index d94341263d..fbd860e304 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -505,18 +505,8 @@ " queue: {1}. Please fix and try again.", true), SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."), - //if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then need modification in getNextNotification - //method in HiveMetaStoreClient - REPL_EVENTS_MISSING_IN_METASTORE(20016, "Notification events are missing in the meta store."), - REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(20017, "Load path {0} not valid as target database is bootstrapped " + - "from some other path : {1}."), - REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20018, "File is missing from both source and cm path."), - REPL_LOAD_PATH_NOT_FOUND(20019, "Load path does not exist."), - REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(20020, - "Source of replication (repl.source.for) is not set in the database properties."), - REPL_INVALID_DB_OR_TABLE_PATTERN(20021, - "Invalid pattern for the DB or table name in the replication policy. " - + "It should be a valid regex enclosed within single or double quotes."), + REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH(20016, "File is missing from both source and cm path."), + REPL_EXTERNAL_SERVICE_CONNECTION_ERROR(20017, "Failed to connect to {0} service. Error code {1}."), // An exception from runtime that will show the full stack to client UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true), @@ -607,7 +597,7 @@ SPARK_GET_JOB_INFO_INTERRUPTED(30045, "Spark job was interrupted while getting job info"), SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}"), - REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired."), + REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired. Error {0}"), SPARK_GET_STAGES_INFO_TIMEOUT(30048, "Spark job GetSparkStagesInfoJob timed out after {0} seconds.", true), SPARK_GET_STAGES_INFO_INTERRUPTED(30049, "Spark job GetSparkStagesInfoJob was interrupted."), SPARK_GET_STAGES_INFO_EXECUTIONERROR(30050, "Spark job GetSparkStagesInfoJob failed in execution while getting job info due to exception {0}", true), @@ -616,7 +606,19 @@ SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true), SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true), - REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication.") + REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication."), + REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(40004, + "Source of replication (repl.source.for) is not set in the database properties."), + REPL_INVALID_DB_OR_TABLE_PATTERN(40005, + "Invalid pattern for the DB or table name in the replication policy. " + + "It should be a valid regex enclosed within single or double quotes."), + //if the error message is changed for REPL_EVENTS_MISSING_IN_METASTORE, then need modification in getNextNotification + //method in HiveMetaStoreClient + REPL_EVENTS_MISSING_IN_METASTORE(40006, "Notification events are missing in the meta store."), + REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID(40007, "Load path {0} not valid as target database is bootstrapped " + + "from some other path : {1}."), + REPL_INVALID_CONFIG_FOR_SERVICE(40007, "Invalid config error : {0} for {1} service."), + REPL_RETRY_EXHAUSTED(40007, "Retry exhausted for retryable error code {}.") ; private int errorCode; diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index fb38b8c44b..f4e7c4f57a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -3120,13 +3120,6 @@ public void testReplErrorScenarios() throws Exception { assertTrue(e.getErrorCode() == ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getErrorCode()); } - try { - // invalid load path - stmt.execute("repl load default into default1"); - } catch(SQLException e){ - assertTrue(e.getErrorCode() == ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getErrorCode()); - } - stmt.close(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index e6384af5dd..73ba6a6c85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; @@ -42,11 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Serializable; +import java.io.*; import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; @@ -56,6 +53,7 @@ import java.util.List; import java.util.Arrays; import java.util.ArrayList; +import java.util.concurrent.Callable; /** * Atlas Metadata Replication Dump Task. @@ -132,31 +130,40 @@ private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedU private long lastStoredTimeStamp() throws SemanticException { Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME); - BufferedReader br = null; + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class) + .withFailOnException(FileNotFoundException.class).build(); try { - FileSystem fs = prevMetadataPath.getFileSystem(conf); - br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset())); - String line = br.readLine(); - if (line == null) { - throw new SemanticException("Could not read lastStoredTimeStamp from atlas metadata file"); - } - String[] lineContents = line.split("\t", 5); - return Long.parseLong(lineContents[1]); - } catch (Exception ex) { - throw new SemanticException(ex); - } finally { - if (br != null) { + return retryable.executeCallable(() -> { + BufferedReader br = null; try { - br.close(); - } catch (IOException e) { - throw new SemanticException(e); + FileSystem fs = prevMetadataPath.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset())); + String line = br.readLine(); + if (line == null) { + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Could not read lastStoredTimeStamp from atlas metadata file", "atlas")); + } + String[] lineContents = line.split("\t", 5); + return Long.parseLong(lineContents[1]); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + //Do nothing + } + } } - } + }); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } private long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException { - AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster()); + AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster(), conf); long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null) ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid); LOG.debug("Current timestamp is: {}", ret); @@ -177,7 +184,7 @@ long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo at } catch (SemanticException ex) { throw ex; } catch (Exception ex) { - throw new SemanticException(ex); + throw new SemanticException(ex.getMessage(), ex); } finally { if (inputStream != null) { try { @@ -196,12 +203,14 @@ private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, Stri AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb); Set> entries = objectId.getUniqueAttributes().entrySet(); if (entries == null || entries.isEmpty()) { - throw new SemanticException("Could find entries in objectId for:" + clusterName); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Could find " + + "entries in objectId for:" + clusterName, "atlas")); } Map.Entry item = entries.iterator().next(); String guid = atlasRestClient.getEntityGuid(objectId.getTypeName(), item.getKey(), (String) item.getValue()); if (guid == null || guid.isEmpty()) { - throw new SemanticException("Entity not found:" + objectId); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Entity not found:" + objectId, "atlas")); } return guid; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index b24b3d62d7..c92526090b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger; @@ -48,6 +49,7 @@ import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; /** * Atlas Metadata Replication Load Task. @@ -113,26 +115,30 @@ AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLExcept private String getStoredFsUri(Path atlasDumpDir) throws SemanticException { Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME); - BufferedReader br = null; + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); try { - FileSystem fs = metadataPath.getFileSystem(conf); - br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset())); - String line = br.readLine(); - if (line == null) { - throw new SemanticException("Could not read stored src FS Uri from atlas metadata file"); - } - String[] lineContents = line.split("\t", 5); - return lineContents[0]; - } catch (Exception ex) { - throw new SemanticException(ex); - } finally { - if (br != null) { + return retryable.executeCallable(() -> { + BufferedReader br = null; try { - br.close(); - } catch (IOException e) { - throw new SemanticException(e); + FileSystem fs = metadataPath.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset())); + String line = br.readLine(); + if (line == null) { + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Could not read stored " + + "src FS Uri from atlas metadata file", "atlas")); + } + String[] lineContents = line.split("\t", 5); + return lineContents[0]; + } finally { + if (br != null) { + br.close(); + } } - } + }); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java index e8a8df1e12..70f4c0169e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.shims.ShimLoader; @@ -43,7 +44,6 @@ */ public class DirCopyTask extends Task implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class); - private static final int MAX_COPY_RETRY = 5; private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException { FileSystem targetFs = destPath.getFileSystem(conf); @@ -51,7 +51,8 @@ private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOE if (!targetFs.exists(destPath)) { // target path is created even if the source path is missing, so that ddl task does not try to create it. if (!targetFs.mkdirs(destPath)) { - throw new IOException(destPath + " is not a directory or unable to create one"); + throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.format( + destPath + " is not a directory or unable to create one")); } createdDir = true; } @@ -86,107 +87,59 @@ private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser return proxyUser.doAs((PrivilegedExceptionAction) () -> sourcePath.getFileSystem(conf).exists(sourcePath)); } - private int handleException(Exception e, Path sourcePath, Path targetPath, - int currentRetry, UserGroupInformation proxyUser) { - try { - LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e); - if (!checkIfPathExist(sourcePath, proxyUser)) { - LOG.info("Source path is missing. Ignoring exception."); - return 0; - } - } catch (Exception ex) { - LOG.warn("Source path missing check failed. ", ex); - } - // retry logic only for i/o exception - if (!(e instanceof IOException)) { - LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e); - setException(e); - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); - } - - if (currentRetry <= MAX_COPY_RETRY) { - LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e); - } else { - LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e); - setException(e); - return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode(); - } - int sleepTime = FileUtils.getSleepTime(currentRetry); - LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry)); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException timerEx) { - LOG.info("Sleep interrupted", timerEx.getMessage()); - } - try { - if (proxyUser == null) { - proxyUser = Utils.getUGI(); - } - FileSystem.closeAllForUGI(proxyUser); - } catch (Exception ex) { - LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex); - } - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); - } - @Override public int execute() { String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + try { + return retryable.executeCallable(() -> { + UserGroupInformation proxyUser = null; + Path sourcePath = work.getFullyQualifiedSourcePath(); + Path targetPath = work.getFullyQualifiedTargetPath(); + try { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { + sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri()); + targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri()); + } + UserGroupInformation ugi = Utils.getUGI(); + String currentUser = ugi.getShortUserName(); + if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) { + proxyUser = UserGroupInformation.createProxyUser( + distCpDoAsUser, UserGroupInformation.getLoginUser()); + } - Path sourcePath = work.getFullyQualifiedSourcePath(); - Path targetPath = work.getFullyQualifiedTargetPath(); - if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { - sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri()); - targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri()); - } - int currentRetry = 0; - int error = 0; - UserGroupInformation proxyUser = null; - while (currentRetry <= MAX_COPY_RETRY) { - try { - UserGroupInformation ugi = Utils.getUGI(); - String currentUser = ugi.getShortUserName(); - if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) { - proxyUser = UserGroupInformation.createProxyUser( - distCpDoAsUser, UserGroupInformation.getLoginUser()); - } - - setTargetPathOwner(targetPath, sourcePath, proxyUser); - - // do we create a new conf and only here provide this additional option so that we get away from - // differences of data in two location for the same directories ? - // basically add distcp.options.delete to hiveconf new object ? - FileUtils.distCp( - sourcePath.getFileSystem(conf), // source file system - Collections.singletonList(sourcePath), // list of source paths - targetPath, - false, - proxyUser, - conf, - ShimLoader.getHadoopShims()); - return 0; - } catch (Exception e) { - currentRetry++; - error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser); - if (error == 0) { - return 0; - } - } finally { - if (proxyUser != null) { + setTargetPathOwner(targetPath, sourcePath, proxyUser); try { - FileSystem.closeAllForUGI(proxyUser); - } catch (IOException e) { - LOG.error("Unable to closeAllForUGI for user " + proxyUser, e); - if (error == 0) { - setException(e); - error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + if (!checkIfPathExist(sourcePath, proxyUser)) { + LOG.info("Source path is missing. Ignoring exception."); + return 0; } - break; + } catch (Exception ex) { + LOG.warn("Source path missing check failed. ", ex); + } + // do we create a new conf and only here provide this additional option so that we get away from + // differences of data in two location for the same directories ? + // basically add distcp.options.delete to hiveconf new object ? + FileUtils.distCp( + sourcePath.getFileSystem(conf), // source file system + Collections.singletonList(sourcePath), // list of source paths + targetPath, + false, + proxyUser, + conf, + ShimLoader.getHadoopShims()); + return 0; + } finally { + if (proxyUser != null) { + FileSystem.closeAllForUGI(proxyUser); } } - } + }); + } catch (Exception e) { + throw new SecurityException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } - return error; } private static Path reservedRawPath(URI uri) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java index 4c1a1708a7..20e385e7c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -92,14 +92,21 @@ public int execute() { } URL url = work.getRangerConfigResource(); if (url == null) { - throw new SemanticException("Ranger configuration is not valid " - + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Ranger configuration is not valid " + + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME, "ranger")); } conf.addResource(url); String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME); String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL); - if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) { - throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint); + if (StringUtils.isEmpty(rangerEndpoint)) { + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Ranger endpoint is not valid " + + rangerEndpoint, "ranger")); + } + if (!rangerRestClient.checkConnection(rangerEndpoint, conf)) { + throw new SemanticException(ErrorMsg.REPL_EXTERNAL_SERVICE_CONNECTION_ERROR.format("ranger", + "Ranger endpoint is not valid " + rangerEndpoint)); } RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint, work.getDbName(), rangerHiveServiceName, conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java index 3c7e9e2463..23b9663199 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -89,14 +89,21 @@ public int execute() { } URL url = work.getRangerConfigResource(); if (url == null) { - throw new SemanticException("Ranger configuration is not valid " - + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Ranger configuration is not valid " + + ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME, "ranger")); } conf.addResource(url); String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME); String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL); - if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) { - throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint); + if (StringUtils.isEmpty(rangerEndpoint)) { + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE + .format("Ranger endpoint is not valid " + + rangerEndpoint, "ranger")); + } + if (!rangerRestClient.checkConnection(rangerEndpoint, conf)) { + throw new SemanticException(ErrorMsg.REPL_EXTERNAL_SERVICE_CONNECTION_ERROR.format("ranger", + "Ranger endpoint is not valid " + rangerEndpoint)); } if (work.getCurrentDumpPath() != null) { LOG.info("Importing Ranger Metadata from {} ", work.getCurrentDumpPath()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index dc8d7903fc..cd5e93f830 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -21,7 +21,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.Constants; @@ -44,7 +43,6 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; -import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -87,12 +85,10 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.io.IOUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.auth.login.LoginException; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; @@ -218,8 +214,9 @@ private void initiateAuthorizationDumpTask() throws SemanticException { } childTasks.add(rangerDumpTask); } else { - throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) - + " not supported for replication "); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Authorizer " + + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) + + " not supported for replication ", "ranger")); } } @@ -647,6 +644,7 @@ private int getMaxEventAllowed(int currentEventMaxLimit) { } return currentEventMaxLimit; } + private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws SemanticException { Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1)); Retryable retryable = Retryable.builder() @@ -668,22 +666,25 @@ private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws S } private long getResumeFrom(Path ackFile) throws SemanticException { - BufferedReader br = null; + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(Exception.class).build(); try { - FileSystem fs = ackFile.getFileSystem(conf); - br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset())); - long lastEventID = Long.parseLong(br.readLine()); - return lastEventID; - } catch (Exception ex) { - throw new SemanticException(ex); - } finally { - if (br != null) { + return retryable.executeCallable(() -> { + BufferedReader br = null; try { - br.close(); - } catch (IOException e) { - throw new SemanticException(e); + FileSystem fs = ackFile.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(ackFile), Charset.defaultCharset())); + long lastEventID = Long.parseLong(br.readLine()); + return lastEventID; + } finally { + if (br != null) { + br.close(); + } } - } + }); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } @@ -729,7 +730,7 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) { } private void dumpTableListToDumpLocation(List tableList, Path dbRoot, String dbName, - HiveConf hiveConf) throws IOException, LoginException { + HiveConf hiveConf) throws Exception { // Empty list will create an empty file to distinguish it from db level replication. If no file is there, that means // db level replication. If empty file is there, means no table satisfies the policy. if (tableList == null) { @@ -738,12 +739,14 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St } // The table list is dumped in _tables/dbname file - Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME); - tableListFile = new Path(tableListFile, dbName.toLowerCase()); - - int count = 0; - while (count < FileUtils.MAX_IO_ERROR_RETRY) { - try (FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile)) { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + try { + retryable.executeCallable((Callable) () -> { + Path tableListFile = new Path(dbRoot, ReplUtils.REPL_TABLE_LIST_DIR_NAME); + tableListFile = new Path(tableListFile, dbName.toLowerCase()); + FSDataOutputStream writer = FileSystem.get(hiveConf).create(tableListFile); for (String tableName : tableList) { String line = tableName.toLowerCase().concat("\n"); writer.write(line.getBytes(StandardCharsets.UTF_8)); @@ -751,27 +754,13 @@ private void dumpTableListToDumpLocation(List tableList, Path dbRoot, St // Close is called explicitly as close also calls the actual file system write, // so there is chance of i/o exception thrown by close. writer.close(); - break; - } catch (IOException e) { - LOG.info("File operation failed", e); - if (count >= (FileUtils.MAX_IO_ERROR_RETRY - 1)) { - //no need to wait in the last iteration - LOG.error("File " + tableListFile.toUri() + " creation failed even after " + - FileUtils.MAX_IO_ERROR_RETRY + " attempts."); - throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); - } - int sleepTime = FileUtils.getSleepTime(count); - LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (count+1)); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException timerEx) { - LOG.info("Sleep interrupted", timerEx.getMessage()); - } - FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI()); - } - count++; + LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); + return null; + }); + } catch (Exception e) { + FileSystem.closeAllForUGI(org.apache.hadoop.hive.shims.Utils.getUGI()); + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } - LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList); } Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) @@ -1064,8 +1053,9 @@ String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveEx } } else { LOG.warn("Force abort all the open txns is disabled after timeout"); - throw new IllegalStateException("REPL DUMP cannot proceed. Force abort all the open txns is disabled. Enable " + - "hive.repl.bootstrap.dump.abort.write.txn.after.timeout to proceed."); + throw new IllegalStateException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("REPL DUMP cannot " + + "proceed. Force abort all the open txns is disabled. Enable " + + "hive.repl.bootstrap.dump.abort.write.txn.after.timeout to proceed.", "hive")); } return validTxnList.toString(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index b6e085891d..d29046790b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.FileList; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -48,6 +49,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; /** * Format of the file used to dump information about external tables: @@ -62,7 +64,6 @@ private static final Logger LOG = LoggerFactory.getLogger(ReplExternalTables.class); private static final String FIELD_SEPARATOR = ","; public static final String FILE_NAME = "_external_tables_info"; - private static final int MAX_RETRIES = 5; private ReplExternalTables(){} @@ -79,9 +80,9 @@ public static Path getExternalTableBaseDir(HiveConf hiveConf) throws SemanticExc String baseDir = hiveConf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname); URI baseDirUri = StringUtils.isEmpty(baseDir) ? null : new Path(baseDir).toUri(); if (baseDirUri == null || baseDirUri.getScheme() == null || baseDirUri.getAuthority() == null) { - throw new SemanticException( + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format( String.format("Fully qualified path for 'hive.repl.replica.external.table.base.dir' is required %s", - baseDir == null ? "" : "- ('" + baseDir + "')")); + baseDir == null ? "" : "- ('" + baseDir + "')"), "hive")); } return new Path(baseDirUri); } @@ -102,7 +103,8 @@ public static Path externalTableDataPath(HiveConf hiveConf, Path basePath, Path basePath.getFileSystem(hiveConf) ); } catch (IOException e) { - throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format( + ErrorMsg.INVALID_PATH.getMsg(), "hive"), e); } return dataPath; } @@ -196,23 +198,22 @@ private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf) return lineToWrite.toString(); } - private void write(String line) throws InterruptedException { - int currentRetry = 0; - while (currentRetry < MAX_RETRIES) { - try { - writer.write(line.getBytes(StandardCharsets.UTF_8)); - break; - } catch (IOException e) { - currentRetry++; - if (currentRetry < MAX_RETRIES) { - LOG.warn("failed to write data with maxRetries {} due to", currentRetry, e); - } else { - LOG.error("failed to write data with maxRetries {} due to", currentRetry, e); - throw new RuntimeException("failed to write data", e); + private void write(String line) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(IOException.class).build(); + try { + retryable.executeCallable((Callable) () -> { + try { + writer.write(line.getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + writer = openWriterAppendMode(); + throw e; } - Thread.sleep(100 * currentRetry * currentRetry); - writer = openWriterAppendMode(); - } + return null; + }); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } @@ -288,33 +289,27 @@ private BufferedReader reader(FileSystem fs, Path externalTableInfo) throws IOEx if (!fileSystem.exists(externalTableInfo)) { return locationsToCopy; } - - int currentRetry = 0; - BufferedReader reader = null; - while (currentRetry < MAX_RETRIES) { - try { - reader = reader(fileSystem, externalTableInfo); - for (String line = reader.readLine(); line != null; line = reader.readLine()) { - String[] splits = line.split(FIELD_SEPARATOR); - locationsToCopy + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(IOException.class).build(); + try { + return retryable.executeCallable(() -> { + BufferedReader reader = null; + try { + reader = reader(fileSystem, externalTableInfo); + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + String[] splits = line.split(FIELD_SEPARATOR); + locationsToCopy .add(new String(Base64.getDecoder().decode(splits[1]), StandardCharsets.UTF_8)); - } - return locationsToCopy; - } catch (IOException e) { - currentRetry++; - if (currentRetry < MAX_RETRIES) { + } + return locationsToCopy; + } finally { closeQuietly(reader); - LOG.warn("failed to read {}", externalTableInfo.toString(), e); - } else { - LOG.error("failed to read {}", externalTableInfo.toString(), e); - throw e; } - } finally { - closeQuietly(reader); - } + }); + } catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } - // we should never reach here - throw new IllegalStateException("we should never reach this condition"); } private static void closeQuietly(BufferedReader reader) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 3c3dc444db..f2b09ccd38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -161,8 +161,9 @@ private void initiateAuthorizationLoadTask() throws SemanticException { } childTasks.add(rangerLoadTask); } else { - throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) - + " not supported for replication "); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Authorizer " + + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) + + " not supported for replication ", "ranger")); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java index dd72f83194..e49664d9ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java @@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.SemanticException; import java.io.InputStream; @@ -30,14 +31,14 @@ */ public interface AtlasRestClient { - InputStream exportData(AtlasExportRequest request) throws Exception; + InputStream exportData(AtlasExportRequest request) throws SemanticException; AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception; - AtlasServer getServer(String endpoint) throws SemanticException; + AtlasServer getServer(String endpoint, HiveConf conf) throws SemanticException; String getEntityGuid(final String entityType, final String attributeName, final String qualifiedName) throws SemanticException; - boolean getStatus() throws SemanticException; + boolean getStatus(HiveConf conf) throws SemanticException; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java index 6f24f57209..6644947b91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java @@ -23,6 +23,7 @@ import org.apache.atlas.AtlasException; import org.apache.commons.configuration.ConfigurationConverter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -64,7 +65,8 @@ public AtlasRestClient getClient(HiveConf conf) throws SemanticException { private AtlasRestClient create() throws SemanticException { if (baseUrls == null || baseUrls.length == 0) { - throw new SemanticException("baseUrls is not set."); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("baseUrls is not set.", + "atlas")); } setUGInfo(); initializeAtlasApplicationProperties(); @@ -92,7 +94,7 @@ private void initializeAtlasApplicationProperties() throws SemanticException { props.setProperty(ATLAS_PROPERTY_AUTH_KERBEROS, "true"); ApplicationProperties.set(ConfigurationConverter.getConfiguration(props)); } catch (AtlasException e) { - throw new SemanticException(e); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format(), e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java index 71e51fb5cc..c67553b56c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java @@ -29,7 +29,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +87,7 @@ public AtlasRestClientImpl(AtlasClientV2 clientV2, HiveConf conf) { } } - public InputStream exportData(AtlasExportRequest request) throws Exception { + public InputStream exportData(AtlasExportRequest request) throws SemanticException { LOG.debug("exportData: {}" + request); return invokeWithRetry(new Callable() { @Override @@ -125,17 +127,15 @@ private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest request return new AtlasImportResult(request, "", "", "", 0L); } - public AtlasServer getServer(String endpoint) throws SemanticException { + public AtlasServer getServer(String endpoint, HiveConf conf) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(Exception.class).build(); try { - return clientV2.getServer(endpoint); - } catch (AtlasServiceException e) { - int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1; - if (statusCode != NOT_FOUND.getStatusCode()) { - throw new SemanticException("Exception while getServer ", e.getCause()); - } - LOG.warn("getServer of: {} returned: {}", endpoint, e.getMessage()); + return retryable.executeCallable(() -> clientV2.getServer(endpoint)); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } - return null; } public String getEntityGuid(final String entityType, @@ -149,12 +149,7 @@ public String getEntityGuid(final String entityType, try { AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = runWithTimeout( - new Callable() { - @Override - public AtlasEntity.AtlasEntityWithExtInfo call() throws Exception { - return clientV2.getEntityByAttribute(entityType, attributes); - } - }, entityApiTimeOut, TimeUnit.SECONDS); + () -> clientV2.getEntityByAttribute(entityType, attributes), entityApiTimeOut, TimeUnit.SECONDS); if (entityWithExtInfo == null || entityWithExtInfo.getEntity() == null) { LOG.warn("Atlas entity cannot be retrieved using: type: {} and {} - {}", @@ -165,7 +160,8 @@ public String getEntityGuid(final String entityType, } catch (AtlasServiceException e) { int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1; if (statusCode != NOT_FOUND.getStatusCode()) { - throw new SemanticException("Exception while getEntityGuid ", e.getCause()); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Exception " + + "while getEntityGuid ", "atlas"), e.getCause()); } LOG.warn("getEntityGuid: Could not retrieve entity guid for: {}-{}-{}", entityType, attributeName, qualifiedName, e.getMessage()); @@ -175,11 +171,14 @@ public String getEntityGuid(final String entityType, } } - public boolean getStatus() throws SemanticException { + public boolean getStatus(HiveConf conf) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(Exception.class).build(); try { - return clientV2.isServerReady(); - } catch (AtlasServiceException e) { - throw new SemanticException(e.getCause()); + return retryable.executeCallable(() -> clientV2.isServerReady()); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java index 59dcbf7c3f..d88fe643c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java @@ -22,6 +22,7 @@ import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.hadoop.hive.conf.HiveConf; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -42,7 +43,7 @@ public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo at return new AtlasImportResult(request, "", "", "", 0L); } - public AtlasServer getServer(String endpoint) { + public AtlasServer getServer(String endpoint, HiveConf conf) { return new AtlasServer(); } @@ -51,7 +52,7 @@ public String getEntityGuid(final String entityType, return UUID.randomUUID().toString(); } - public boolean getStatus() { + public boolean getStatus(HiveConf conf) { return true; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java index 25471a445f..6ddb114d43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java @@ -20,6 +20,8 @@ import com.sun.jersey.api.client.UniformInterfaceException; import org.apache.atlas.AtlasServiceException; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +43,7 @@ protected double backOff; protected int maxJitterInSeconds; - protected T invokeWithRetry(Callable func, T defaultReturnValue) throws Exception { + protected T invokeWithRetry(Callable func, T defaultReturnValue) throws SemanticException { long startTime = System.currentTimeMillis(); long delay = this.initialDelayInSeconds; while (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) { @@ -58,7 +60,7 @@ return null; } LOG.error(func.getClass().getName(), e); - throw new Exception(e); + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e); } } return defaultReturnValue; @@ -100,7 +102,7 @@ private boolean processInvalidParameterException(Exception e) { || e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP)); } - private boolean processImportExportLockException(Exception e, long delay) throws Exception { + private boolean processImportExportLockException(Exception e, long delay) throws SemanticException { if (!(e instanceof AtlasServiceException)) { return false; } @@ -111,7 +113,7 @@ private boolean processImportExportLockException(Exception e, long delay) throws Thread.sleep(delay); } catch (InterruptedException intEx) { LOG.error("Pause wait interrupted!", intEx); - throw new Exception(intEx); + throw new SemanticException(intEx); } return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java index adaaa02af2..e8026f74df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.security.UserGroupInformation; @@ -83,7 +84,8 @@ public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, HiveConf hiveConf)throws SemanticException { LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint); if (StringUtils.isEmpty(rangerHiveServiceName)) { - throw new SemanticException("Ranger Service Name cannot be empty"); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Ranger Service Name " + + "cannot be empty", "ranger")); } Retryable retryable = Retryable.builder() .withHiveConf(hiveConf) @@ -92,7 +94,7 @@ public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, return retryable.executeCallable(() -> exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, dbName)); } catch (Exception e) { - throw new SemanticException(e); + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } @@ -361,7 +363,7 @@ public Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyLi return retryable.executeCallable(() -> writeExportedRangerPoliciesToJsonFile(jsonRangerExportPolicyList, fileName, stagingDirPath, conf)); } catch (Exception e) { - throw new SemanticException(e); + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } @@ -392,7 +394,7 @@ public boolean checkConnection(String url, HiveConf hiveConf) throws SemanticExc try { return retryable.executeCallable(() -> checkConnectionPlain(url)); } catch (Exception e) { - throw new SemanticException(e); + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } @@ -408,7 +410,8 @@ boolean checkConnectionPlain(String url) { public List addDenyPolicies(List rangerPolicies, String rangerServiceName, String sourceDb, String targetDb) throws SemanticException { if (StringUtils.isEmpty(rangerServiceName)) { - throw new SemanticException("Ranger Service Name cannot be empty"); + throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE.format("Ranger Service " + + "Name cannot be empty", "ranger")); } RangerPolicy denyRangerPolicy = new RangerPolicy(); denyRangerPolicy.setService(rangerServiceName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 3fb271dab1..c386aeeda5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -27,7 +27,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -43,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.stream.Collectors; public class CopyUtils { @@ -297,39 +300,27 @@ private boolean isSourceFileMismatch(FileSystem sourceFs, ReplChangeManager.File return false; } - private UserGroupInformation getProxyUser() throws LoginException, IOException { + private UserGroupInformation getProxyUser() throws IOException { if (copyAsUser == null) { return null; } - UserGroupInformation proxyUser = null; - int currentRetry = 0; - while (currentRetry <= MAX_IO_RETRY) { - try { + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(IOException.class).build(); + try { + return retryable.executeCallable(() -> { + UserGroupInformation proxyUser = null; UserGroupInformation ugi = Utils.getUGI(); String currentUser = ugi.getShortUserName(); if (!currentUser.equals(copyAsUser)) { proxyUser = UserGroupInformation.createProxyUser( - copyAsUser, UserGroupInformation.getLoginUser()); + copyAsUser, UserGroupInformation.getLoginUser()); } return proxyUser; - } catch (IOException e) { - currentRetry++; - if (currentRetry <= MAX_IO_RETRY) { - LOG.warn("Unable to get UGI info", e); - } else { - LOG.error("Unable to get UGI info", e); - throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); - } - int sleepTime = FileUtils.getSleepTime(currentRetry); - LOG.info("Sleep for " + sleepTime + " milliseconds before retry " + (currentRetry)); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException timerEx) { - LOG.info("Sleep interrupted", timerEx.getMessage()); - } - } + }); + } catch (Exception e) { + throw new IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } - return null; } // Copy without retry