diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java index ed7485d241..73977497e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java @@ -133,7 +133,19 @@ public AtlasServer getServer(String endpoint, HiveConf conf) throws SemanticExce .withHiveConf(conf) .withRetryOnException(AtlasServiceException.class).build(); try { - return retryable.executeCallable(() -> clientV2.getServer(endpoint)); + return retryable.executeCallable((Callable) () -> { + try { + return clientV2.getServer(endpoint); + } catch (AtlasServiceException e) { + int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1; + if (NOT_FOUND.getStatusCode() == statusCode) { + // Atlas server entity is initialized on first import/export o/p. + LOG.info("Atlas server entity is not found"); + return null; + } + throw e; + } + }); } catch (Exception e) { throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java index 935dc993ae..16cbea5d6b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.exec.repl; +import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.AtlasClientV2; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasServer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; @@ -32,7 +34,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplState; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; import org.junit.Test; @@ -54,6 +55,7 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.when; @@ -154,10 +156,7 @@ public void testRetryingClientTimeBasedExhausted() throws AtlasServiceException when(atlasServiceException.getMessage()).thenReturn("import or export is in progress"); when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenThrow(atlasServiceException); when(exportRequest.toString()).thenReturn("dummyExportRequest"); - when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); - when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L); - when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS)).thenReturn(20L); - when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f); + setupConfForRetry(); AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient; InputStream inputStream = null; @@ -175,4 +174,58 @@ public void testRetryingClientTimeBasedExhausted() throws AtlasServiceException } Assert.assertTrue(inputStream == null); } + + @Test + public void testAtlasServerEntity() throws AtlasServiceException, SemanticException { + AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class); + AtlasServer atlasServer = mock(AtlasServer.class); + when(atlasClientV2.getServer(anyString())).thenReturn(atlasServer); + AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); + AtlasServer atlasServerRet = atlasClient.getServer("src", conf); + Assert.assertTrue(atlasServer == atlasServerRet); + } + + @Test + public void testAtlasServerEntityNotFound() throws AtlasServiceException, SemanticException { + setupConfForRetry(); + AtlasServiceException atlasServiceException = mock(AtlasServiceException.class); + ClientResponse.Status status = mock(ClientResponse.Status.class); + when(atlasServiceException.getStatus()).thenReturn(status); + when(status.getStatusCode()).thenReturn(404); + AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class); + when(atlasClientV2.getServer(anyString())).thenThrow(atlasServiceException); + AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); + AtlasServer atlasServerRet = atlasClient.getServer("src", conf); + Assert.assertNull(atlasServerRet); + ArgumentCaptor getServerReqCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(atlasClientV2, Mockito.times(1)).getServer(getServerReqCaptor.capture()); + } + + @Test + public void testAtlasServerEntityRetryExhausted() throws AtlasServiceException { + setupConfForRetry(); + AtlasServiceException atlasServiceException = mock(AtlasServiceException.class); + ClientResponse.Status status = mock(ClientResponse.Status.class); + when(atlasServiceException.getStatus()).thenReturn(status); + when(status.getStatusCode()).thenReturn(400); + AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class); + when(atlasClientV2.getServer(anyString())).thenThrow(atlasServiceException); + AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf); + try { + atlasClient.getServer("src", conf); + Assert.fail("Should have thrown SemanticException."); + } catch (SemanticException ex) { + Assert.assertTrue(ex.getMessage().contains("Retry exhausted for retryable error code")); + Assert.assertTrue(atlasServiceException == ex.getCause()); + } + ArgumentCaptor getServerReqCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(atlasClientV2, Mockito.times(4)).getServer(getServerReqCaptor.capture()); + } + + private void setupConfForRetry() { + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS)).thenReturn(20L); + when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f); + } }