diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dc24c87891..c0156c380d 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -613,12 +613,18 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal true, "This configuration will add a deny policy on the target database for all users except hive" + " to avoid any update to the target database"), + REPL_RANGER_CLIENT_READ_TIMEOUT("hive.repl.ranger.client.read.timeout", "300s", + new TimeValidator(TimeUnit.SECONDS), "Ranger client read timeout for Ranger REST API calls."), REPL_INCLUDE_ATLAS_METADATA("hive.repl.include.atlas.metadata", false, "Indicates if Atlas metadata should be replicated along with Hive data and metadata or not."), REPL_ATLAS_ENDPOINT("hive.repl.atlas.endpoint", null, "Atlas endpoint of the current cluster hive database is getting replicated from/to."), REPL_ATLAS_REPLICATED_TO_DB("hive.repl.atlas.replicatedto", null, "Target hive database name Atlas metadata of source hive database is being replicated to."), + REPL_ATLAS_CLIENT_READ_TIMEOUT("hive.repl.atlas.client.read.timeout", "7200s", + new TimeValidator(TimeUnit.SECONDS), "Atlas client read timeout for Atlas REST API calls."), + REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT("hive.repl.external.client.connect.timeout", "10s", + new TimeValidator(TimeUnit.SECONDS), "Client connect timeout for REST API calls to external service."), REPL_SOURCE_CLUSTER_NAME("hive.repl.source.cluster.name", null, "Name of the source cluster for the replication."), REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java index fa0ca5f634..75cb45badd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.Properties; +import java.util.concurrent.TimeUnit; /** * Builder for AtlasRestClient. @@ -42,6 +43,8 @@ private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address"; private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos"; private static final String URL_SEPERATOR = ","; + public static final String ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS = "atlas.client.connectTimeoutMSecs"; + public static final String ATLAS_PROPERTY_READ_TIMEOUT_IN_MS = "atlas.client.readTimeoutMSecs"; private UserGroupInformation userGroupInformation; protected String incomingUrl; @@ -69,7 +72,7 @@ private AtlasRestClient create(HiveConf conf) throws SemanticException { ReplUtils.REPL_ATLAS_SERVICE)); } setUGInfo(); - initializeAtlasApplicationProperties(); + initializeAtlasApplicationProperties(conf); AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation, this.userGroupInformation.getShortUserName(), baseUrls); return new AtlasRestClientImpl(clientV2, conf); @@ -85,9 +88,13 @@ private AtlasRestClientBuilder setUGInfo() throws SemanticException { return this; } - private void initializeAtlasApplicationProperties() throws SemanticException { + private void initializeAtlasApplicationProperties(HiveConf conf) throws SemanticException { try { Properties props = new Properties(); + props.setProperty(ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS, String.valueOf( + conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS))); + props.setProperty(ATLAS_PROPERTY_READ_TIMEOUT_IN_MS, String.valueOf( + conf.getTimeVar(HiveConf.ConfVars.REPL_ATLAS_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS))); props.setProperty(ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY, "1"); props.setProperty(ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, "0"); props.setProperty(ATLAS_PROPERTY_REST_ADDRESS, incomingUrl); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java index 459c95f5b1..b14a44ffa3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java @@ -69,6 +69,7 @@ import java.util.LinkedHashMap; import java.util.Arrays; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; /** * RangerRestClientImpl to connect to Ranger and export policies. @@ -93,7 +94,7 @@ public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, .withRetryOnException(Exception.class).build(); try { return retryable.executeCallable(() -> exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, - dbName)); + dbName, hiveConf)); } catch (Exception e) { throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } @@ -102,10 +103,11 @@ public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, @VisibleForTesting RangerExportPolicyList exportRangerPoliciesPlain(String sourceRangerEndpoint, String rangerHiveServiceName, - String dbName) throws SemanticException, URISyntaxException { + String dbName, HiveConf hiveConf) + throws SemanticException, URISyntaxException { String finalUrl = getRangerExportUrl(sourceRangerEndpoint, rangerHiveServiceName, dbName); LOG.debug("Url to export policies from source Ranger: {}", finalUrl); - WebResource.Builder builder = getRangerResourceBuilder(finalUrl); + WebResource.Builder builder = getRangerResourceBuilder(finalUrl, hiveConf); RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList(); ClientResponse clientResp = builder.get(ClientResponse.class); String response = null; @@ -205,7 +207,7 @@ public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList ranger try { return retryable.executeCallable(() -> importRangerPoliciesPlain(jsonRangerExportPolicyList, rangerPoliciesJsonFileName, - serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList)); + serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList, hiveConf)); } catch (Exception e) { throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } @@ -215,7 +217,7 @@ private RangerExportPolicyList importRangerPoliciesPlain(String jsonRangerExport String rangerPoliciesJsonFileName, String serviceMapJsonFileName, String jsonServiceMap, String finalUrl, RangerExportPolicyList - rangerExportPolicyList) throws Exception { + rangerExportPolicyList, HiveConf hiveConf) throws Exception { ClientResponse clientResp = null; StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file", new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)), @@ -227,7 +229,7 @@ private RangerExportPolicyList importRangerPoliciesPlain(String jsonRangerExport MultiPart multipartEntity = null; try { multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap); - WebResource.Builder builder = getRangerResourceBuilder(finalUrl); + WebResource.Builder builder = getRangerResourceBuilder(finalUrl, hiveConf); clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA) .post(ClientResponse.class, multipartEntity); if (clientResp != null) { @@ -270,11 +272,16 @@ public String getRangerImportUrl(String rangerUrl, String dbName) throws URISynt return uriBuilder.build().toString(); } - private synchronized Client getRangerClient() { + @VisibleForTesting + synchronized Client getRangerClient(HiveConf hiveConf) { Client ret = null; ClientConfig config = new DefaultClientConfig(); config.getClasses().add(MultiPartWriter.class); config.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, true); + config.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, + (int) hiveConf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS)); + config.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, + (int) hiveConf.getTimeVar(HiveConf.ConfVars.REPL_RANGER_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS)); ret = Client.create(config); return ret; } @@ -398,16 +405,16 @@ public boolean checkConnection(String url, HiveConf hiveConf) throws SemanticExc .withHiveConf(hiveConf) .withRetryOnException(Exception.class).build(); try { - return retryable.executeCallable(() -> checkConnectionPlain(url)); + return retryable.executeCallable(() -> checkConnectionPlain(url, hiveConf)); } catch (Exception e) { throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e); } } @VisibleForTesting - boolean checkConnectionPlain(String url) { + boolean checkConnectionPlain(String url, HiveConf hiveConf) { WebResource.Builder builder; - builder = getRangerResourceBuilder(url); + builder = getRangerResourceBuilder(url, hiveConf); ClientResponse clientResp = builder.get(ClientResponse.class); return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED); } @@ -478,8 +485,8 @@ boolean checkConnectionPlain(String url) { } - private WebResource.Builder getRangerResourceBuilder(String url) { - Client client = getRangerClient(); + private WebResource.Builder getRangerResourceBuilder(String url, HiveConf hiveConf) { + Client client = getRangerClient(hiveConf); WebResource webResource = client.resource(url); WebResource.Builder builder = webResource.getRequestBuilder(); return builder; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java index 5f79db03f1..99a51ed0be 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java @@ -24,6 +24,7 @@ import org.apache.atlas.AtlasServiceException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.commons.configuration.ConfigurationConverter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; @@ -42,6 +43,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; @@ -55,6 +57,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Properties; import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; @@ -65,7 +68,7 @@ * Unit test class for testing Atlas metadata Dump. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({LoggerFactory.class, UserGroupInformation.class}) +@PrepareForTest({LoggerFactory.class, UserGroupInformation.class, ConfigurationConverter.class}) public class TestAtlasDumpTask { @Mock @@ -218,6 +221,27 @@ public void testAtlasServerEntityRetryExhausted() throws AtlasServiceException { Mockito.verify(atlasClientV2, Mockito.times(4)).getServer(getServerReqCaptor.capture()); } + @Test + public void testAtlasClientTimeouts() throws Exception { + when(conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT, + TimeUnit.MILLISECONDS)).thenReturn(20L); + when(conf.getTimeVar(HiveConf.ConfVars.REPL_ATLAS_CLIENT_READ_TIMEOUT, TimeUnit.MILLISECONDS)).thenReturn(500L); + mockStatic(UserGroupInformation.class); + when(UserGroupInformation.getLoginUser()).thenReturn(mock(UserGroupInformation.class)); + mockStatic(ConfigurationConverter.class); + when(ConfigurationConverter.getConfiguration(Mockito.any(Properties.class))).thenCallRealMethod(); + AtlasRestClientBuilder atlasRestCleintBuilder = new AtlasRestClientBuilder("http://localhost:31000"); + AtlasRestClient atlasClient = atlasRestCleintBuilder.getClient(conf); + Assert.assertTrue(atlasClient != null); + ArgumentCaptor propsCaptor = ArgumentCaptor.forClass(Properties.class); + PowerMockito.verifyStatic(ConfigurationConverter.class, Mockito.times(1)); + ConfigurationConverter.getConfiguration(propsCaptor.capture()); + Assert.assertEquals("20", propsCaptor.getValue().getProperty( + AtlasRestClientBuilder.ATLAS_PROPERTY_CONNECT_TIMEOUT_IN_MS)); + Assert.assertEquals("500", propsCaptor.getValue().getProperty( + AtlasRestClientBuilder.ATLAS_PROPERTY_READ_TIMEOUT_IN_MS)); + } + private void setupConfForRetry() { when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java index 5f41488ef8..10825f3657 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.repl.ranger; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.config.ClientConfig; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; @@ -74,12 +76,12 @@ public void setup() throws Exception { @Test public void testSuccessSimpleAuthCheckConnection() throws Exception { Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false); - Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true); + Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString(), Mockito.any(HiveConf.class))).thenReturn(true); Mockito.when(mockClient.checkConnection(Mockito.anyString(), Mockito.any())).thenCallRealMethod(); mockClient.checkConnection("http://localhost:6080/ranger", conf); ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); Mockito.verify(mockClient, - Mockito.times(1)).checkConnectionPlain(urlCaptor.capture()); + Mockito.times(1)).checkConnectionPlain(urlCaptor.capture(), Mockito.any(HiveConf.class)); Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue()); ArgumentCaptor privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class); Mockito.verify(userGroupInformation, @@ -89,8 +91,8 @@ public void testSuccessSimpleAuthCheckConnection() throws Exception { @Test public void testSuccessSimpleAuthRangerExport() throws Exception { Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false); - Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) - .thenReturn(new RangerExportPolicyList()); + Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), + Mockito.any(HiveConf.class))).thenReturn(new RangerExportPolicyList()); Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any())) .thenCallRealMethod(); @@ -110,4 +112,16 @@ public void testSuccessSimpleAuthRangerExport() throws Exception { Mockito.verify(userGroupInformation, Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture()); } + + @Test + public void testRangerClientTimeouts() { + Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_EXTERNAL_CLIENT_CONNECT_TIMEOUT, + TimeUnit.MILLISECONDS)).thenReturn(20L); + Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_RANGER_CLIENT_READ_TIMEOUT, + TimeUnit.MILLISECONDS)).thenReturn(500L); + Mockito.when(mockClient.getRangerClient(Mockito.any(HiveConf.class))).thenCallRealMethod(); + Client client =mockClient.getRangerClient(conf); + Assert.assertEquals(20, client.getProperties().get(ClientConfig.PROPERTY_CONNECT_TIMEOUT)); + Assert.assertEquals(500, client.getProperties().get(ClientConfig.PROPERTY_READ_TIMEOUT)); + } }