diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index 502dabbd06..94ea3c2d9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; @@ -79,6 +80,7 @@ public AtlasDumpTask() { @Override public int execute() { try { + SecurityUtils.reloginExpiringKeytabUser(); AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:", atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index 534c85da32..b24b3d62d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; @@ -68,6 +69,7 @@ public AtlasLoadTask() { @Override public int execute() { try { + SecurityUtils.reloginExpiringKeytabUser(); AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); Map metricMap = new HashMap<>(); metricMap.put(ReplUtils.MetricName.ENTITIES.name(), 0L); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java index 92ca6ea6ed..7a0259fc0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient; @@ -80,6 +81,7 @@ public int execute() { long exportCount = 0; Path filePath = null; LOG.info("Exporting Ranger Metadata"); + SecurityUtils.reloginExpiringKeytabUser(); Map metricMap = new HashMap<>(); metricMap.put(ReplUtils.MetricName.POLICIES.name(), 0L); work.getMetricCollector().reportStageStart(getName(), metricMap); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java index fa57efd2fc..20f84012c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient; @@ -82,6 +83,7 @@ public int execute() { LOG.info("Importing Ranger Metadata"); RangerExportPolicyList rangerExportPolicyList = null; List rangerPolicies = null; + SecurityUtils.reloginExpiringKeytabUser(); if (rangerRestClient == null) { rangerRestClient = getRangerRestClient(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 8d76557a86..bb0ae1f8a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.metastore.utils.Retry; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -152,6 +153,7 @@ public String getName() { @Override public int execute() { try { + SecurityUtils.reloginExpiringKeytabUser(); if (work.tableDataCopyIteratorsInitialized()) { initiateDataCopyTasks(); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 87543f20b9..5ac9a05847 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc; @@ -105,6 +106,7 @@ public StageType getType() { @Override public int execute() { try { + SecurityUtils.reloginExpiringKeytabUser(); Task rootTask = work.getRootTask(); if (rootTask != null) { rootTask.setChildTasks(null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java index c8d738ef11..7a3bf61268 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java @@ -27,6 +27,7 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; @@ -82,6 +83,7 @@ public InputStream exportData(AtlasExportRequest request) throws Exception { return invokeWithRetry(new Callable() { @Override public InputStream call() throws Exception { + SecurityUtils.reloginExpiringKeytabUser(); return clientV2.exportData(request); } }, null); @@ -100,6 +102,7 @@ public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo at public AtlasImportResult call() throws Exception { InputStream is = null; try { + SecurityUtils.reloginExpiringKeytabUser(); is = fs.open(exportFilePath); return clientV2.importData(request, is); } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java index 91389eacc0..1b167231fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.ranger; +import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.sun.jersey.api.client.Client; @@ -35,7 +36,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.utils.Retry; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.client.utils.URIBuilder; import org.eclipse.jetty.util.MultiPartWriter; import org.slf4j.Logger; @@ -54,6 +57,8 @@ import java.net.URISyntaxException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Map; import java.util.ArrayList; @@ -80,34 +85,13 @@ public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, Retry retriable = new Retry(Exception.class) { @Override public RangerExportPolicyList execute() throws Exception { - String finalUrl = getRangerExportUrl(sourceRangerEndpoint, rangerHiveServiceName, dbName); - LOG.debug("Url to export policies from source Ranger: {}", finalUrl); - WebResource.Builder builder = getRangerResourceBuilder(finalUrl); - RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList(); - ClientResponse clientResp = builder.get(ClientResponse.class); - String response = null; - if (clientResp != null) { - if (clientResp.getStatus() == HttpServletResponse.SC_OK) { - Gson gson = new GsonBuilder().create(); - response = clientResp.getEntity(String.class); - LOG.debug("Response received for ranger export {} ", response); - if (StringUtils.isNotEmpty(response)) { - rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class); - return rangerExportPolicyList; - } - } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { - LOG.debug("Ranger policy export request returned empty list"); - return rangerExportPolicyList; - } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { - throw new SemanticException("Authentication Failure while communicating to Ranger admin"); - } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) { - throw new SemanticException("Authorization Failure while communicating to Ranger admin"); - } - } - if (StringUtils.isEmpty(response)) { - LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs."); + if (UserGroupInformation.isSecurityEnabled()) { + SecurityUtils.reloginExpiringKeytabUser(); + return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction) () -> + exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, dbName)); + } else { + return exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, dbName); } - return null; } }; try { @@ -117,6 +101,40 @@ public RangerExportPolicyList execute() throws Exception { } } + @VisibleForTesting + RangerExportPolicyList exportRangerPoliciesPlain(String sourceRangerEndpoint, + String rangerHiveServiceName, + String dbName) throws SemanticException, URISyntaxException { + String finalUrl = getRangerExportUrl(sourceRangerEndpoint, rangerHiveServiceName, dbName); + LOG.debug("Url to export policies from source Ranger: {}", finalUrl); + WebResource.Builder builder = getRangerResourceBuilder(finalUrl); + RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList(); + ClientResponse clientResp = builder.get(ClientResponse.class); + String response = null; + if (clientResp != null) { + if (clientResp.getStatus() == HttpServletResponse.SC_OK) { + Gson gson = new GsonBuilder().create(); + response = clientResp.getEntity(String.class); + LOG.debug("Response received for ranger export {} ", response); + if (StringUtils.isNotEmpty(response)) { + rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class); + return rangerExportPolicyList; + } + } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { + LOG.debug("Ranger policy export request returned empty list"); + return rangerExportPolicyList; + } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { + throw new SemanticException("Authentication Failure while communicating to Ranger admin"); + } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) { + throw new SemanticException("Authorization Failure while communicating to Ranger admin"); + } + } + if (StringUtils.isEmpty(response)) { + LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs."); + } + return null; + } + public String getRangerExportUrl(String sourceRangerEndpoint, String rangerHiveServiceName, String dbName) throws URISyntaxException { URIBuilder uriBuilder = new URIBuilder(sourceRangerEndpoint); @@ -185,50 +203,15 @@ public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList ranger Retry retriable = new Retry(Exception.class) { @Override public RangerExportPolicyList execute() throws Exception { - ClientResponse clientResp = null; - - StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file", - new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)), - rangerPoliciesJsonFileName); - StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson", - new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName); - - FormDataMultiPart formDataMultiPart = new FormDataMultiPart(); - MultiPart multipartEntity = null; - try { - multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap); - WebResource.Builder builder = getRangerResourceBuilder(finalUrl); - clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA) - .post(ClientResponse.class, multipartEntity); - if (clientResp != null) { - if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { - LOG.debug("Ranger policy import finished successfully"); - - } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { - throw new Exception("Authentication Failure while communicating to Ranger admin"); - } else { - throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs."); - } - } - } finally { - try { - if (filePartPolicies != null) { - filePartPolicies.cleanup(); - } - if (filePartServiceMap != null) { - filePartServiceMap.cleanup(); - } - if (formDataMultiPart != null) { - formDataMultiPart.close(); - } - if (multipartEntity != null) { - multipartEntity.close(); - } - } catch (IOException e) { - LOG.error("Exception occurred while closing resources: {}", e); - } + if (UserGroupInformation.isSecurityEnabled()) { + SecurityUtils.reloginExpiringKeytabUser(); + return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction) () -> + importRangerPoliciesPlain(jsonRangerExportPolicyList, rangerPoliciesJsonFileName, + serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList)); + } else { + return importRangerPoliciesPlain(jsonRangerExportPolicyList, rangerPoliciesJsonFileName, + serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList); } - return rangerExportPolicyList; } }; try { @@ -238,6 +221,56 @@ public RangerExportPolicyList execute() throws Exception { } } + private RangerExportPolicyList importRangerPoliciesPlain(String jsonRangerExportPolicyList, + String rangerPoliciesJsonFileName, + String serviceMapJsonFileName, String jsonServiceMap, + String finalUrl, RangerExportPolicyList + rangerExportPolicyList) throws Exception { + ClientResponse clientResp = null; + StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file", + new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)), + rangerPoliciesJsonFileName); + StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson", + new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName); + + FormDataMultiPart formDataMultiPart = new FormDataMultiPart(); + MultiPart multipartEntity = null; + try { + multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap); + WebResource.Builder builder = getRangerResourceBuilder(finalUrl); + clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA) + .post(ClientResponse.class, multipartEntity); + if (clientResp != null) { + if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { + LOG.debug("Ranger policy import finished successfully"); + + } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { + throw new Exception("Authentication Failure while communicating to Ranger admin"); + } else { + throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs."); + } + } + } finally { + try { + if (filePartPolicies != null) { + filePartPolicies.cleanup(); + } + if (filePartServiceMap != null) { + filePartServiceMap.cleanup(); + } + if (formDataMultiPart != null) { + formDataMultiPart.close(); + } + if (multipartEntity != null) { + multipartEntity.close(); + } + } catch (IOException e) { + LOG.error("Exception occurred while closing resources: {}", e); + } + } + return rangerExportPolicyList; + } + public String getRangerImportUrl(String rangerUrl, String dbName) throws URISyntaxException { URIBuilder uriBuilder = new URIBuilder(rangerUrl); uriBuilder.setPath(RANGER_REST_URL_IMPORTJSONFILE); @@ -376,10 +409,12 @@ public boolean checkConnection(String url) throws SemanticException { Retry retriable = new Retry(Exception.class) { @Override public Boolean execute() throws Exception { - WebResource.Builder builder; - builder = getRangerResourceBuilder(url); - ClientResponse clientResp = builder.get(ClientResponse.class); - return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED); + if (UserGroupInformation.isSecurityEnabled()) { + SecurityUtils.reloginExpiringKeytabUser(); + return UserGroupInformation.getLoginUser().doAs((PrivilegedAction) () -> checkConnectionPlain(url)); + } else { + return checkConnectionPlain(url); + } } }; try { @@ -389,6 +424,14 @@ public Boolean execute() throws Exception { } } + @VisibleForTesting + boolean checkConnectionPlain(String url) { + WebResource.Builder builder; + builder = getRangerResourceBuilder(url); + ClientResponse clientResp = builder.get(ClientResponse.class); + return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED); + } + @Override public List addDenyPolicies(List rangerPolicies, String rangerServiceName, String sourceDb, String targetDb) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java index 61cc34881d..ba19e28a24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java @@ -77,6 +77,9 @@ public void reportStageEnd(String stageName, Status status, long lastReplId) thr Stage stage = progress.getStageByName(stageName); stage.setStatus(status); stage.setEndTime(System.currentTimeMillis()); + if (Status.FAILED == status) { + progress.setStatus(Status.FAILED); + } replicationMetric.setProgress(progress); Metadata metadata = replicationMetric.getMetadata(); metadata.setLastReplId(lastReplId); @@ -92,6 +95,9 @@ public void reportStageEnd(String stageName, Status status) throws SemanticExcep Stage stage = progress.getStageByName(stageName); stage.setStatus(status); stage.setEndTime(System.currentTimeMillis()); + if (Status.FAILED == status) { + progress.setStatus(Status.FAILED); + } replicationMetric.setProgress(progress); metricCollector.addMetric(replicationMetric); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java new file mode 100644 index 0000000000..12afd8e7b5 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.ranger; + +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + +/** + * Unit test class for testing Ranger Dump. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({UserGroupInformation.class}) +public class TestRangerRestClient { + + @Mock + private RangerRestClientImpl mockClient; + + @Mock + private UserGroupInformation userGroupInformation; + + @Before + public void setup() throws Exception { + PowerMockito.mockStatic(UserGroupInformation.class); + Mockito.when(UserGroupInformation.getLoginUser()).thenReturn(userGroupInformation); + Mockito.when(userGroupInformation.doAs((PrivilegedAction) Mockito.any())).thenCallRealMethod(); + Mockito.when(mockClient.getRangerExportUrl(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenCallRealMethod(); + Mockito.when(mockClient.getRangerImportUrl(Mockito.anyString(), Mockito.anyString())) + .thenCallRealMethod(); + } + + @Test + public void testSuccessSimpleAuthCheckConnection() throws Exception { + Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false); + Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true); + Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenCallRealMethod(); + mockClient.checkConnection("http://localhost:6080/ranger"); + ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockClient, + Mockito.times(1)).checkConnectionPlain(urlCaptor.capture()); + Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue()); + ArgumentCaptor privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class); + Mockito.verify(userGroupInformation, + Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture()); + } + + @Test + public void testSuccessKerberosAuthCheckConnection() throws Exception { + Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(true); + Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true); + Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenCallRealMethod(); + mockClient.checkConnection("http://localhost:6080/ranger"); + ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockClient, + Mockito.times(1)).checkConnectionPlain(urlCaptor.capture()); + Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue()); + ArgumentCaptor privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class); + Mockito.verify(userGroupInformation, + Mockito.times(3)).doAs(privilegedActionArgumentCaptor.capture()); + } + + @Test + public void testSuccessSimpleAuthRangerExport() throws Exception { + Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false); + Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(new RangerExportPolicyList()); + Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenCallRealMethod(); + mockClient.exportRangerPolicies("http://localhost:6080/ranger", "db", + "hive"); + ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor dbCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor serviceCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockClient, + Mockito.times(1)).exportRangerPolicies(urlCaptor.capture(), dbCaptor.capture(), + serviceCaptor.capture()); + Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue()); + Assert.assertEquals("db", dbCaptor.getValue()); + Assert.assertEquals("hive", serviceCaptor.getValue()); + ArgumentCaptor privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class); + Mockito.verify(userGroupInformation, + Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture()); + } + + @Test + public void testSuccessKerberosAuthRangerExport() throws Exception { + Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(true); + Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(new RangerExportPolicyList()); + Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenCallRealMethod(); + mockClient.exportRangerPolicies("http://localhost:6080/ranger", "db", + "hive"); + ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor dbCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor serviceCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(mockClient, + Mockito.times(1)).exportRangerPolicies(urlCaptor.capture(), dbCaptor.capture(), + serviceCaptor.capture()); + Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue()); + Assert.assertEquals("db", dbCaptor.getValue()); + Assert.assertEquals("hive", serviceCaptor.getValue()); + ArgumentCaptor privilegedActionArgumentCaptor = ArgumentCaptor + .forClass(PrivilegedExceptionAction.class); + Mockito.verify(userGroupInformation, + Mockito.times(1)).doAs(privilegedActionArgumentCaptor.capture()); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java index 95de5a849e..625a6e161d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricCollector.java @@ -303,4 +303,18 @@ private void checkSuccess(ReplicationMetric actual, ReplicationMetric expected, } } + @Test + public void testSuccessStageFailure() throws Exception { + ReplicationMetricCollector bootstrapDumpMetricCollector = new BootstrapDumpMetricCollector("db", + "staging", conf); + Map metricMap = new HashMap<>(); + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 10); + metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) 1); + bootstrapDumpMetricCollector.reportStageStart("dump", metricMap); + bootstrapDumpMetricCollector.reportStageEnd("dump", Status.FAILED); + List metricList = MetricCollector.getInstance().getMetrics(); + Assert.assertEquals(1, metricList.size()); + ReplicationMetric actualMetric = metricList.get(0); + Assert.assertEquals(Status.FAILED, actualMetric.getProgress().getStatus()); + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java index bf47d1cd77..d56365b924 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.metastore.annotation.NoReconnect; @@ -93,7 +94,7 @@ protected RetryingMetaStoreClient(Configuration conf, Class[] constructorArgT String msUri = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS); localMetaStore = (msUri == null) || msUri.trim().isEmpty(); - reloginExpiringKeytabUser(); + SecurityUtils.reloginExpiringKeytabUser(); this.base = JavaUtils.newInstance(msClientClass, constructorArgTypes, constructorArgs); @@ -174,7 +175,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl while (true) { try { - reloginExpiringKeytabUser(); + SecurityUtils.reloginExpiringKeytabUser(); if (allowReconnect) { if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { @@ -325,28 +326,4 @@ private boolean hasConnectionLifeTimeReached(Method method) { return shouldReconnect; } - /** - * Relogin if login user is logged in using keytab - * Relogin is actually done by ugi code only if sufficient time has passed - * A no-op if kerberos security is not enabled - * @throws MetaException - */ - private void reloginExpiringKeytabUser() throws MetaException { - if(!UserGroupInformation.isSecurityEnabled()){ - return; - } - try { - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - //checkTGT calls ugi.relogin only after checking if it is close to tgt expiry - //hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x) - if(ugi.isFromKeytab()){ - ugi.checkTGTAndReloginFromKeytab(); - } - } catch (IOException e) { - String msg = "Error doing relogin using keytab " + e.getMessage(); - LOG.error(msg, e); - throw new MetaException(msg); - } - } - } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java index bae1ec35b2..5ce340f360 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore.utils; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier; import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector; import org.apache.hadoop.io.Text; @@ -267,4 +268,28 @@ private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTranspo sslSocket.setSSLParameters(sslParams); return new TSocket(sslSocket); } + + /** + * Relogin if login user is logged in using keytab + * Relogin is actually done by ugi code only if sufficient time has passed + * A no-op if kerberos security is not enabled + * @throws MetaException + */ + public static void reloginExpiringKeytabUser() throws MetaException { + if(!UserGroupInformation.isSecurityEnabled()){ + return; + } + try { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + //checkTGT calls ugi.relogin only after checking if it is close to tgt expiry + //hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x) + if(ugi.isFromKeytab()){ + ugi.checkTGTAndReloginFromKeytab(); + } + } catch (IOException e) { + String msg = "Error doing relogin using keytab " + e.getMessage(); + LOG.error(msg, e); + throw new MetaException(msg); + } + } }