diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index 45d67c61ba..2f6b9185f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -194,9 +194,13 @@ long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo at AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo, atlasReplInfo.getSrcCluster()); inputStream = atlasRestClient.exportData(exportRequest); - FileSystem fs = atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf()); - Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); - numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, conf); + if (inputStream == null) { + LOG.info("There is no Atlas metadata to be exported"); + } else { + FileSystem fs = atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf()); + Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); + numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, conf); + } } catch (SemanticException ex) { throw ex; } catch (Exception ex) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 61b3652829..3c6fdaad41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -180,7 +180,8 @@ private void initiateAuthorizationLoadTask() throws SemanticException { private void addAtlasLoadTask() throws HiveException { Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir); - AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir, + String targetDbName = StringUtils.isEmpty(work.dbNameToLoadIn) ? work.getSourceDbName() : work.dbNameToLoadIn; + AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), targetDbName, atlasDumpDir, work.getMetricCollector()); Task atlasLoadTask = TaskFactory.get(atlasLoadWork, conf); if (childTasks == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java index e4b294d436..ed7485d241 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java @@ -95,7 +95,7 @@ public InputStream call() throws Exception { SecurityUtils.reloginExpiringKeytabUser(); return clientV2.exportData(request); } - }, null); + }); } public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception { @@ -103,6 +103,7 @@ public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo at Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); FileSystem fs = FileSystem.get(exportFilePath.toUri(), atlasReplInfo.getConf()); if (!fs.exists(exportFilePath)) { + LOG.info("There is nothing to load, returning the default result."); return defaultResult; } LOG.debug("Atlas import data request: {}" + request); @@ -120,7 +121,7 @@ public AtlasImportResult call() throws Exception { } } } - }, defaultResult); + }); } private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest request) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java index 6ddb114d43..c84567ebff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java @@ -43,10 +43,10 @@ protected double backOff; protected int maxJitterInSeconds; - protected T invokeWithRetry(Callable func, T defaultReturnValue) throws SemanticException { + protected T invokeWithRetry(Callable func) throws SemanticException { long startTime = System.currentTimeMillis(); long delay = this.initialDelayInSeconds; - while (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) { + while (true) { try { LOG.debug("Retrying method: {}", func.getClass().getName(), null); return func.call(); @@ -54,16 +54,19 @@ if (processImportExportLockException(e, delay)) { //retry case. compute next sleep time delay = getNextDelay(delay); + if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e); + } continue; } if (processInvalidParameterException(e)) { + LOG.info("There is nothing to export/import."); return null; } LOG.error(func.getClass().getName(), e); throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e); } } - return defaultReturnValue; } private long getNextDelay(long currentDelay) { @@ -109,8 +112,8 @@ private boolean processImportExportLockException(Exception e, long delay) throws String excMessage = e.getMessage() == null ? "" : e.getMessage(); if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) { try { - LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", delay); - Thread.sleep(delay); + LOG.info("Atlas in-progress operation detected. Will pause for: {} seconds", delay); + Thread.sleep(delay * 1000L); } catch (InterruptedException intEx) { LOG.error("Pause wait interrupted!", intEx); throw new SemanticException(intEx); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java index 31081ab1e0..5b2fe4ee29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java @@ -202,9 +202,13 @@ public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList ranger Retryable retryable = Retryable.builder() .withHiveConf(hiveConf) .withRetryOnException(Exception.class).build(); - return retryable.executeCallable(() -> importRangerPoliciesPlain(jsonRangerExportPolicyList, - rangerPoliciesJsonFileName, - serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList)); + try { + return retryable.executeCallable(() -> importRangerPoliciesPlain(jsonRangerExportPolicyList, + rangerPoliciesJsonFileName, + serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList)); + } catch (Exception e) { + throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); + } } private RangerExportPolicyList importRangerPoliciesPlain(String jsonRangerExportPolicyList, diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java index fbc6dfc229..935dc993ae 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java @@ -18,16 +18,21 @@ package org.apache.hadoop.hive.ql.exec.repl; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientImpl; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplState; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; import org.junit.Test; @@ -43,7 +48,11 @@ import static org.mockito.ArgumentMatchers.any; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.mockStatic; @@ -114,4 +123,56 @@ public void testAtlasRestClientBuilder() throws SemanticException, IOException { AtlasRestClient atlasClient = atlasRestCleintBuilder.getClient(conf); Assert.assertTrue(atlasClient != null); } + + @Test + public void testRetryingClientTimeBased() throws SemanticException, IOException, AtlasServiceException { + AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class); + AtlasExportRequest exportRequest = mock(AtlasExportRequest.class); + String exportResponseData = "dumpExportContent"; + InputStream exportedMetadataIS = new ByteArrayInputStream(exportResponseData.getBytes(StandardCharsets.UTF_8)); + when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenReturn(exportedMetadataIS); + when(exportRequest.toString()).thenReturn("dummyExportRequest"); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(1L); + AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); + AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient; + InputStream inputStream = atlasRestClientImpl.exportData(exportRequest); + ArgumentCaptor expReqCaptor = ArgumentCaptor.forClass(AtlasExportRequest.class); + Mockito.verify(atlasClientV2, Mockito.times(1)).exportData(expReqCaptor.capture()); + Assert.assertEquals(expReqCaptor.getValue().toString(), "dummyExportRequest"); + byte[] exportResponseDataReadBytes = new byte[exportResponseData.length()]; + inputStream.read(exportResponseDataReadBytes); + String exportResponseDataReadString = new String(exportResponseDataReadBytes, StandardCharsets.UTF_8); + Assert.assertEquals(exportResponseData, exportResponseDataReadString); + } + + @Test + public void testRetryingClientTimeBasedExhausted() throws AtlasServiceException { + AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class); + AtlasExportRequest exportRequest = mock(AtlasExportRequest.class); + AtlasServiceException atlasServiceException = mock(AtlasServiceException.class); + when(atlasServiceException.getMessage()).thenReturn("import or export is in progress"); + when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenThrow(atlasServiceException); + when(exportRequest.toString()).thenReturn("dummyExportRequest"); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS)).thenReturn(20L); + when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f); + AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); + AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient; + InputStream inputStream = null; + try { + inputStream = atlasRestClientImpl.exportData(exportRequest); + Assert.fail("Should have thrown SemanticException."); + } catch (SemanticException ex) { + Assert.assertTrue(ex.getMessage().contains("Retry exhausted for retryable error code")); + Assert.assertTrue(atlasServiceException == ex.getCause()); + } + ArgumentCaptor expReqCaptor = ArgumentCaptor.forClass(AtlasExportRequest.class); + Mockito.verify(atlasClientV2, Mockito.times(3)).exportData(expReqCaptor.capture()); + for (AtlasExportRequest atlasExportRequest: expReqCaptor.getAllValues()) { + Assert.assertEquals(atlasExportRequest.toString(), "dummyExportRequest"); + } + Assert.assertTrue(inputStream == null); + } }