diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 829791e0a9..4d81622a53 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -526,6 +526,22 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This is the base directory on the target/replica warehouse under which data for " + "external tables is stored. This is relative base path and hence prefixed to the source " + "external table path on target cluster."), + REPL_INCLUDE_AUTHORIZATION_METADATA("hive.repl.include.authorization.metadata", false, + "This configuration will enable security and authorization related metadata along " + + "with the hive data and metadata replication. "), + REPL_AUTHORIZATION_PROVIDER_SERVICE("hive.repl.authorization.provider.service", "ranger", + "This configuration will define which service will provide the security and authorization " + + "related metadata that needs to be replicated along " + + "with the hive data and metadata replication. Set the configuration " + + "hive.repl.include.authorization.metadata to false to disable " + + "security policies being replicated "), + REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT("hive.repl.authorization.provider.service.endpoint", + "", + "This configuration will define the authorization service endpoint"), + REPL_RANGER_SERVICE_NAME("hive.repl.ranger.service.name", + "cm_hive", + "This configuration will define the service name for which the ranger authorization" + + " policies needs to be replicated"), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index fa96b87417..641df005ed 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -424,11 +424,12 @@ public boolean validate(Task task) { return validator.hasTask(rootTask); } - private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tuple tuple) throws Throwable { + private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIncrementalDump, + Tuple tuple) throws Throwable { HiveConf confTemp = new HiveConf(); confTemp.set("hive.repl.enable.move.optimization", "true"); Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); - ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), replicadb, + ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), sourceDb, replicadb, null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId)); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext()); @@ -448,7 +449,7 @@ public void testTaskCreationOptimization() throws Throwable { Tuple dump = replDumpDb(dbName); //bootstrap load should not have move task - Task task = getReplLoadRootTask(dbNameReplica, false, dump); + Task task = getReplLoadRootTask(dbName, dbNameReplica, false, dump); assertEquals(false, hasMoveTask(task)); assertEquals(true, hasPartitionTask(task)); @@ -462,7 +463,7 @@ public void testTaskCreationOptimization() throws Throwable { // Partition level statistics gets updated as part of the INSERT above. So we see a partition // task corresponding to an ALTER_PARTITION event. - task = getReplLoadRootTask(dbNameReplica, true, dump); + task = getReplLoadRootTask(dbName, dbNameReplica, true, dump); assertEquals(true, hasMoveTask(task)); assertEquals(true, hasPartitionTask(task)); @@ -475,7 +476,7 @@ public void testTaskCreationOptimization() throws Throwable { dump = replDumpDb(dbName); //no move task should be added as the operation is adding a dynamic partition - task = getReplLoadRootTask(dbNameReplica, true, dump); + task = getReplLoadRootTask(dbName, dbNameReplica, true, dump); assertEquals(false, hasMoveTask(task)); assertEquals(true, hasPartitionTask(task)); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 1adc4fbc1f..901a4ede9a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -1493,4 +1493,72 @@ public Boolean apply(@Nullable CallerArguments args) { .run("select country from t2 order by country") .verifyResults(Arrays.asList("china", "india")); } + + /* + Can't test complete replication as mini ranger is not supported + Testing just the configs and no impact on existing replication + */ + @Test + public void testRangerReplication() throws Throwable { + List clause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'", + "'hive.in.test'='true'", + "'hive.repl.authorization.provider.service.endpoint'='http://localhost:6080/ranger'"); + primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)") + .dump(primaryDbName, clause); + + replica.load(replicatedDbName, primaryDbName, clause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"acid_table", "table1"}) + .run("select * from table1") + .verifyResults(new String[] {"1", "2"}); + } + + /* + Can't test complete replication as mini ranger is not supported + Testing just the configs and no impact on existing replication + */ + @Test + public void testFailureRangerReplication() throws Throwable { + List clause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'", + "'hive.in.test'='true'"); + primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)"); + try { + primary.dump(primaryDbName, clause); + } catch (Exception e) { + assertEquals("Ranger endpoint is not valid. Please pass a valid config " + + "hive.repl.authorization.provider.service.endpoint", e.getMessage()); + } + } + + /* +Can't test complete replication as mini ranger is not supported +Testing just the configs and no impact on existing replication + */ + @Test + public void testFailureUnsupportedAuthorizerReplication() throws Throwable { + List clause = Arrays.asList("'hive.repl.include.authorization.metadata'='true'", + "'hive.in.test'='true'", "'hive.repl.authorization.provider.service'='sentry'"); + primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)"); + try { + primary.dump(primaryDbName, clause); + } catch (SemanticException e) { + assertEquals("Authorizer sentry not supported for replication ", e.getMessage()); + } + } } diff --git a/ql/pom.xml b/ql/pom.xml index e08439269f..c22c72deaa 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -816,6 +816,11 @@ org.codehaus.janino janino + + com.sun.jersey.contribs + jersey-multipart + ${jersey.version} + diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 25d530cf35..57b028b711 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -26,7 +26,9 @@ REPL_TXN(15), REPL_INCREMENTAL_LOAD(16), SCHEDULED_QUERY_MAINT(17), - ACK(18); + ACK(18), + RANGER_DUMP(19), + RANGER_LOAD(20); private final int value; @@ -83,6 +85,12 @@ public static StageType findByValue(int value) { return REPL_INCREMENTAL_LOAD; case 17: return SCHEDULED_QUERY_MAINT; + case 18: + return ACK; + case 19: + return RANGER_DUMP; + case 20: + return RANGER_LOAD; default: return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index c82e8d2c1d..f0e54613fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -37,6 +37,10 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.DirCopyTask; import org.apache.hadoop.hive.ql.exec.repl.DirCopyWork; +import org.apache.hadoop.hive.ql.exec.repl.RangerLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.RangerLoadTask; +import org.apache.hadoop.hive.ql.exec.repl.RangerDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.RangerDumpTask; import org.apache.hadoop.hive.ql.exec.schq.ScheduledQueryMaintenanceTask; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; @@ -114,6 +118,8 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); taskvec.add(new TaskTuple(AckWork.class, AckTask.class)); + taskvec.add(new TaskTuple(RangerDumpWork.class, RangerDumpTask.class)); + taskvec.add(new TaskTuple(RangerLoadWork.class, RangerLoadTask.class)); taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple(ReplTxnWork.class, ReplTxnTask.class)); taskvec.add(new TaskTuple(DirCopyWork.class, DirCopyTask.class)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java new file mode 100644 index 0000000000..f9d3de7531 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy; +import org.apache.hadoop.hive.ql.exec.repl.ranger.NoOpRangerRestClient; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.List; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_RANGER_SERVICE_NAME; + +/** + * RangerDumpTask. + * + * Exports the Ranger security policies to staging directory. + **/ +public class RangerDumpTask extends Task implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RangerDumpTask.class); + + private transient RangerRestClient rangerRestClient; + + public RangerDumpTask() { + super(); + } + + @VisibleForTesting + RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work) { + this.conf = conf; + this.work = work; + this.rangerRestClient = rangerRestClient; + } + + @Override + public String getName() { + return "RANGER_DUMP"; + } + + @Override + public int execute() { + try { + int exportCount = 0; + Path filePath = null; + LOG.info("Exporting Ranger Metadata"); + if (rangerRestClient == null) { + rangerRestClient = getRangerRestClient(); + } + String rangerEndpoint = conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT); + if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) { + throw new Exception("Ranger endpoint is not valid. " + + "Please pass a valid config hive.repl.authorization.provider.service.endpoint"); + } + String rangerHiveServiceName = conf.getVar(REPL_RANGER_SERVICE_NAME); + RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint, + work.getDbName(), rangerHiveServiceName); + List rangerPolicies = rangerExportPolicyList.getPolicies(); + if (rangerPolicies.isEmpty()) { + LOG.info("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs."); + rangerExportPolicyList = new RangerExportPolicyList(); + } else { + rangerPolicies = rangerRestClient.removeMultiResourcePolicies(rangerPolicies); + } + if (!CollectionUtils.isEmpty(rangerPolicies)) { + rangerExportPolicyList.setPolicies(rangerPolicies); + filePath = rangerRestClient.saveRangerPoliciesToFile(rangerExportPolicyList, + work.getCurrentDumpPath(), ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME, conf); + if (filePath != null) { + LOG.info("Ranger policy export finished successfully"); + exportCount = rangerExportPolicyList.getListSize(); + } + } + LOG.debug("Ranger policy export filePath:" + filePath); + LOG.info("Number of ranger policies exported {}", exportCount); + return 0; + } catch (Exception e) { + LOG.error("failed", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + } + + private RangerRestClient getRangerRestClient() { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + return new NoOpRangerRestClient(); + } + return new RangerRestClientImpl(); + } + + @Override + public StageType getType() { + return StageType.RANGER_DUMP; + } + + @Override + public boolean canExecuteInParallel() { + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java new file mode 100644 index 0000000000..873a2ecc35 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpWork.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.Explain; + +import java.io.Serializable; + +/** + * RangerDumpWork. + * + * Export Ranger authorization policies. + **/ +@Explain(displayName = "Ranger Dump Operator", explainLevels = { Explain.Level.USER, + Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class RangerDumpWork implements Serializable { + private static final long serialVersionUID = 1L; + private Path currentDumpPath; + private String dbName; + + public RangerDumpWork(Path currentDumpPath, String dbName) { + this.currentDumpPath = currentDumpPath; + this.dbName = dbName; + } + + public Path getCurrentDumpPath() { + return currentDumpPath; + } + + public String getDbName() { + return dbName; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java new file mode 100644 index 0000000000..5497d28eff --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl; +import org.apache.hadoop.hive.ql.exec.repl.ranger.NoOpRangerRestClient; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_RANGER_SERVICE_NAME; + +/** + * RangerLoadTask. + * + * Rask to import Ranger authorization policies. + **/ +public class RangerLoadTask extends Task implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RangerLoadTask.class); + + private transient RangerRestClient rangerRestClient; + + public RangerLoadTask() { + super(); + } + + @VisibleForTesting + RangerLoadTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerLoadWork work) { + this.conf = conf; + this.work = work; + this.rangerRestClient = rangerRestClient; + } + + @Override + public String getName() { + return "RANGER_LOAD"; + } + + @Override + public int execute() { + try { + LOG.info("Importing Ranger Metadata"); + RangerExportPolicyList rangerExportPolicyList = null; + List rangerPolicies = null; + if (rangerRestClient == null) { + rangerRestClient = getRangerRestClient(); + } + String rangerEndpoint = conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT); + if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) { + throw new Exception("Ranger endpoint is not valid. " + + "Please pass a valid config hive.repl.authorization.provider.service.endpoint"); + } + if (work.getCurrentDumpPath() != null) { + LOG.info("Importing Ranger Metadata from {} ", work.getCurrentDumpPath()); + rangerExportPolicyList = rangerRestClient.readRangerPoliciesFromJsonFile(new Path(work.getCurrentDumpPath(), + ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME), conf); + if (rangerExportPolicyList != null && !CollectionUtils.isEmpty(rangerExportPolicyList.getPolicies())) { + rangerPolicies = rangerExportPolicyList.getPolicies(); + } + } + + if (CollectionUtils.isEmpty(rangerPolicies)) { + LOG.info("There are no ranger policies to import"); + rangerPolicies = new ArrayList<>(); + } + List updatedRangerPolicies = rangerRestClient.changeDataSet(rangerPolicies, work.getSourceDbName(), + work.getTargetDbName()); + int importCount = 0; + if (!CollectionUtils.isEmpty(updatedRangerPolicies)) { + if (rangerExportPolicyList == null) { + rangerExportPolicyList = new RangerExportPolicyList(); + } + rangerExportPolicyList.setPolicies(updatedRangerPolicies); + rangerRestClient.importRangerPolicies(rangerExportPolicyList, work.getTargetDbName(), rangerEndpoint, + conf.getVar(REPL_RANGER_SERVICE_NAME)); + LOG.info("Number of ranger policies imported {}", rangerExportPolicyList.getListSize()); + importCount = rangerExportPolicyList.getListSize(); + LOG.info("Ranger policy import finished {} ", importCount); + } + return 0; + } catch (Exception e) { + LOG.error("Failed", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + } + + private RangerRestClient getRangerRestClient() { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { + return new NoOpRangerRestClient(); + } + return new RangerRestClientImpl(); + } + + @Override + public StageType getType() { + return StageType.RANGER_LOAD; + } + + @Override + public boolean canExecuteInParallel() { + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java new file mode 100644 index 0000000000..64f5df005f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadWork.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * RangerLoadWork. + * + * Work to import Ranger authorization policies. + **/ +@Explain(displayName = "Ranger Load Operator", explainLevels = { Explain.Level.USER, + Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) +public class RangerLoadWork implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(RangerLoadWork.class); + private Path currentDumpPath; + private String targetDbName; + private String sourceDbName; + + public RangerLoadWork(Path currentDumpPath, String sourceDbName, String targetDbName) { + this.currentDumpPath = currentDumpPath; + this.targetDbName = targetDbName; + this.sourceDbName = sourceDbName; + } + + public Path getCurrentDumpPath() { + return currentDumpPath; + } + + public String getTargetDbName() { + return targetDbName; + } + + public String getSourceDbName() { + return sourceDbName; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 055decdd49..79ebcd3164 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -99,6 +99,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER; public class ReplDumpTask extends Task implements Serializable { private static final long serialVersionUID = 1L; @@ -144,6 +145,11 @@ public int execute() { if (shouldDump(previousValidHiveDumpPath)) { Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + work.setCurrentDumpPath(currentDumpPath); + if (shouldDumpAuthorizationMetadata()) { + LOG.info("Initiate authorization metadata dump provided by {} ", RANGER_AUTHORIZER); + initiateAuthorizationDumpTask(currentDumpPath); + } DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); @@ -156,7 +162,6 @@ public int execute() { lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive()); } work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); - work.setCurrentDumpPath(currentDumpPath); initiateDataCopyTasks(); } else { LOG.info("Previous Dump is not yet loaded"); @@ -170,6 +175,26 @@ public int execute() { return 0; } + private void initiateAuthorizationDumpTask(Path currentDumpPath) throws SemanticException { + if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { + Path rangerDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_RANGER_BASE_DIR); + LOG.info("Exporting Authorization Metadata at {} ", rangerDumpRoot); + RangerDumpWork rangerDumpWork = new RangerDumpWork(rangerDumpRoot, work.dbNameOrPattern); + Task rangerDumpTask = TaskFactory.get(rangerDumpWork, conf); + if (childTasks == null) { + childTasks = new ArrayList<>(); + } + childTasks.add(rangerDumpTask); + } else { + throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) + + " not supported for replication "); + } + } + + private boolean shouldDumpAuthorizationMetadata() { + return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA); + } + private Path getEncodedDumpRootPath() throws UnsupportedEncodingException { return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() @@ -190,7 +215,9 @@ private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOExc private void initiateDataCopyTasks() throws SemanticException { TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS)); - List> childTasks = new ArrayList<>(); + if (childTasks == null) { + childTasks = new ArrayList<>(); + } childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf)); childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf)); if (childTasks.isEmpty()) { @@ -198,7 +225,6 @@ private void initiateDataCopyTasks() throws SemanticException { finishRemainingTasks(); } else { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); - this.childTasks = childTasks; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index b578d48ce1..f77616ac36 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -61,6 +62,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.plan.api.StageType; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -71,6 +73,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER; public class ReplLoadTask extends Task implements Serializable { private static final long serialVersionUID = 1L; @@ -97,206 +100,251 @@ public StageType getType() { @Override public int execute() { - Task rootTask = work.getRootTask(); - if (rootTask != null) { - rootTask.setChildTasks(null); + try { + Task rootTask = work.getRootTask(); + if (rootTask != null) { + rootTask.setChildTasks(null); + } + work.setRootTask(this); + this.parentTasks = null; + if (shouldLoadAuthorizationMetadata()) { + LOG.info("Loading authorization data provided by service {} ", RANGER_AUTHORIZER); + initiateAuthorizationLoadTask(work.dumpDirectory); + } + if (work.isIncrementalLoad()) { + return executeIncrementalLoad(); + } else { + return executeBootStrapLoad(); + } + } catch (RuntimeException e) { + LOG.error("replication failed with run time exception", e); + throw e; + } catch (Exception e) { + LOG.error("replication failed", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); } - work.setRootTask(this); - this.parentTasks = null; - if (work.isIncrementalLoad()) { - return executeIncrementalLoad(); + } + + private boolean shouldLoadAuthorizationMetadata() { + return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA); + } + + private void initiateAuthorizationLoadTask(String hiveDumpDirectory) throws SemanticException { + if (RANGER_AUTHORIZER.equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE))) { + Path rangerLoadRoot = new Path(new Path(hiveDumpDirectory).getParent(), ReplUtils.REPL_RANGER_BASE_DIR); + LOG.info("Adding Import Ranger Metadata Task from {} ", rangerLoadRoot); + RangerLoadWork rangerLoadWork = new RangerLoadWork(rangerLoadRoot, work.getSourceDbName(), work.dbNameToLoadIn); + Task rangerLoadTask = TaskFactory.get(rangerLoadWork, conf); + if (childTasks == null) { + childTasks = new ArrayList<>(); + } + childTasks.add(rangerLoadTask); } else { - return executeBootStrapLoad(); + throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) + + " not supported for replication "); } } - private int executeBootStrapLoad() { - try { - int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - Context loadContext = new Context(work.dumpDirectory, conf, getHive(), - work.sessionStateLineageState, context); - TaskTracker loadTaskTracker = new TaskTracker(maxTasks); - /* - for now for simplicity we are doing just one directory ( one database ), come back to use - of multiple databases once we have the basic flow to chain creating of tasks in place for - a database ( directory ) - */ - BootstrapEventsIterator iterator = work.bootstrapIterator(); - ConstraintEventsIterator constraintIterator = work.constraintsIterator(); + private int executeBootStrapLoad() throws Exception { + int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); + Context loadContext = new Context(work.dumpDirectory, conf, getHive(), + work.sessionStateLineageState, context); + TaskTracker loadTaskTracker = new TaskTracker(maxTasks); + /* + for now for simplicity we are doing just one directory ( one database ), come back to use + of multiple databases once we have the basic flow to chain creating of tasks in place for + a database ( directory ) + */ + BootstrapEventsIterator iterator = work.bootstrapIterator(); + ConstraintEventsIterator constraintIterator = work.constraintsIterator(); + /* + This is used to get hold of a reference during the current creation of tasks and is initialized + with "0" tasks such that it will be non consequential in any operations done with task tracker + compositions. + */ + TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); + TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); + Scope scope = new Scope(); + boolean loadingConstraint = false; + if (!iterator.hasNext() && constraintIterator.hasNext()) { + loadingConstraint = true; + } + while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) + && loadTaskTracker.canAddMoreTasks()) { + BootstrapEvent next; + if (!loadingConstraint) { + next = iterator.next(); + } else { + next = constraintIterator.next(); + } + switch (next.eventType()) { + case Database: + DatabaseEvent dbEvent = (DatabaseEvent) next; + dbTracker = new LoadDatabase(loadContext, dbEvent, work.dbNameToLoadIn, loadTaskTracker).tasks(); + loadTaskTracker.update(dbTracker); + if (work.hasDbState()) { + loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope)); + } else { + // Scope might have set to database in some previous iteration of loop, so reset it to false if database + // tracker has no tasks. + scope.database = false; + } + work.updateDbEventState(dbEvent.toState()); + if (dbTracker.hasTasks()) { + scope.rootTasks.addAll(dbTracker.tasks()); + scope.database = true; + } + dbTracker.debugLog("database"); + break; + case Table: { /* - This is used to get hold of a reference during the current creation of tasks and is initialized - with "0" tasks such that it will be non consequential in any operations done with task tracker - compositions. + Implicit assumption here is that database level is processed first before table level, + which will depend on the iterator used since it should provide the higher level directory + listing before providing the lower level listing. This is also required such that + the dbTracker / tableTracker are setup correctly always. */ - TaskTracker dbTracker = new TaskTracker(ZERO_TASKS); - TaskTracker tableTracker = new TaskTracker(ZERO_TASKS); - Scope scope = new Scope(); - boolean loadingConstraint = false; - if (!iterator.hasNext() && constraintIterator.hasNext()) { - loadingConstraint = true; - } - while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) - && loadTaskTracker.canAddMoreTasks()) { - BootstrapEvent next; - if (!loadingConstraint) { - next = iterator.next(); + TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); + FSTableEvent tableEvent = (FSTableEvent) next; + if (TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) { + tableTracker = new TaskTracker(1); + tableTracker.addTask(createViewTask(tableEvent.getMetaData(), work.dbNameToLoadIn, conf)); } else { - next = constraintIterator.next(); + LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext, + loadTaskTracker); + tableTracker = loadTable.tasks(work.isIncrementalLoad()); } - switch (next.eventType()) { - case Database: - DatabaseEvent dbEvent = (DatabaseEvent) next; - dbTracker = new LoadDatabase(loadContext, dbEvent, work.dbNameToLoadIn, loadTaskTracker).tasks(); - loadTaskTracker.update(dbTracker); - if (work.hasDbState()) { - loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope)); - } else { - // Scope might have set to database in some previous iteration of loop, so reset it to false if database - // tracker has no tasks. - scope.database = false; - } - work.updateDbEventState(dbEvent.toState()); - if (dbTracker.hasTasks()) { - scope.rootTasks.addAll(dbTracker.tasks()); - scope.database = true; - } - dbTracker.debugLog("database"); - break; - case Table: { - /* - Implicit assumption here is that database level is processed first before table level, - which will depend on the iterator used since it should provide the higher level directory - listing before providing the lower level listing. This is also required such that - the dbTracker / tableTracker are setup correctly always. - */ - TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); - FSTableEvent tableEvent = (FSTableEvent) next; - if (TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) { - tableTracker = new TaskTracker(1); - tableTracker.addTask(createViewTask(tableEvent.getMetaData(), work.dbNameToLoadIn, conf)); - } else { - LoadTable loadTable = new LoadTable(tableEvent, loadContext, iterator.replLogger(), tableContext, - loadTaskTracker); - tableTracker = loadTable.tasks(work.isIncrementalLoad()); - } - - setUpDependencies(dbTracker, tableTracker); - if (!scope.database && tableTracker.hasTasks()) { - scope.rootTasks.addAll(tableTracker.tasks()); - scope.table = true; - } else { - // Scope might have set to table in some previous iteration of loop, so reset it to false if table - // tracker has no tasks. - scope.table = false; - } - if (!TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) { - /* - for table replication if we reach the max number of tasks then for the next run we will - try to reload the same table again, this is mainly for ease of understanding the code - as then we can avoid handling == > loading partitions for the table given that - the creation of table lead to reaching max tasks vs, loading next table since current - one does not have partitions. - */ - - // for a table we explicitly try to load partitions as there is no separate partitions events. - LoadPartitions loadPartitions = - new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent, - work.dbNameToLoadIn, tableContext); - TaskTracker partitionsTracker = loadPartitions.tasks(); - partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, - partitionsTracker); - tableTracker.debugLog("table"); - partitionsTracker.debugLog("partitions for table"); - } - break; + setUpDependencies(dbTracker, tableTracker); + if (!scope.database && tableTracker.hasTasks()) { + scope.rootTasks.addAll(tableTracker.tasks()); + scope.table = true; + } else { + // Scope might have set to table in some previous iteration of loop, so reset it to false if table + // tracker has no tasks. + scope.table = false; } - case Partition: { - /* - This will happen only when loading tables and we reach the limit of number of tasks we can create; - hence we know here that the table should exist and there should be a lastPartitionName - */ - PartitionEvent event = (PartitionEvent) next; - TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); + + if (!TableType.VIRTUAL_VIEW.name().equals(tableEvent.getMetaData().getTable().getTableType())) { + /* + for table replication if we reach the max number of tasks then for the next run we will + try to reload the same table again, this is mainly for ease of understanding the code + as then we can avoid handling == > loading partitions for the table given that + the creation of table lead to reaching max tasks vs, loading next table since current + one does not have partitions. + */ + + // for a table we explicitly try to load partitions as there is no separate partitions events. LoadPartitions loadPartitions = - new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker, - event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); - /* - the tableTracker here should be a new instance and not an existing one as this can - only happen when we break in between loading partitions. - */ + new LoadPartitions(loadContext, iterator.replLogger(), loadTaskTracker, tableEvent, + work.dbNameToLoadIn, tableContext); TaskTracker partitionsTracker = loadPartitions.tasks(); partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, partitionsTracker); - partitionsTracker.debugLog("partitions"); - break; - } - case Function: { - LoadFunction loadFunction = new LoadFunction(loadContext, iterator.replLogger(), - (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); - TaskTracker functionsTracker = loadFunction.tasks(); - if (!scope.database) { - scope.rootTasks.addAll(functionsTracker.tasks()); - } else { - setUpDependencies(dbTracker, functionsTracker); - } - loadTaskTracker.update(functionsTracker); - functionsTracker.debugLog("functions"); - break; - } - case Constraint: { - LoadConstraint loadConstraint = - new LoadConstraint(loadContext, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker); - TaskTracker constraintTracker = loadConstraint.tasks(); - scope.rootTasks.addAll(constraintTracker.tasks()); - loadTaskTracker.update(constraintTracker); - constraintTracker.debugLog("constraints"); - } - } - - if (!loadingConstraint && !iterator.currentDbHasNext()) { - createEndReplLogTask(loadContext, scope, iterator.replLogger()); + tableTracker.debugLog("table"); + partitionsTracker.debugLog("partitions for table"); } + break; } - - boolean addAnotherLoadTask = iterator.hasNext() - || loadTaskTracker.hasReplicationState() - || constraintIterator.hasNext(); - - if (addAnotherLoadTask) { - createBuilderTask(scope.rootTasks); + case Partition: { + /* + This will happen only when loading tables and we reach the limit of number of tasks we can create; + hence we know here that the table should exist and there should be a lastPartitionName + */ + addLoadPartitionTasks(loadContext, next, dbTracker, iterator, scope, loadTaskTracker, tableTracker); + break; } - - // Update last repl ID of the database only if the current dump is not incremental. If bootstrap - // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change - // last repl ID of the database. - if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) { - loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope)); - work.updateDbEventState(null); + case Function: { + loadTaskTracker.update(addLoadFunctionTasks(loadContext, iterator, next, dbTracker, scope)); + break; } - this.childTasks = scope.rootTasks; - /* - Since there can be multiple rounds of this run all of which will be tied to the same - query id -- generated in compile phase , adding a additional UUID to the end to print each run - in separate files. - */ - LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); + case Constraint: { + loadTaskTracker.update(addLoadConstraintsTasks(loadContext, next, dbTracker, scope)); + } + default: + break; + } + if (!loadingConstraint && !iterator.currentDbHasNext()) { + createEndReplLogTask(loadContext, scope, iterator.replLogger()); + } + } + boolean addAnotherLoadTask = iterator.hasNext() + || loadTaskTracker.hasReplicationState() + || constraintIterator.hasNext(); - // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later - context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs()); - createReplLoadCompleteAckTask(); - } catch (RuntimeException e) { - LOG.error("replication failed with run time exception", e); - throw e; - } catch (Exception e) { - LOG.error("replication failed", e); - setException(e); - return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + if (addAnotherLoadTask) { + createBuilderTask(scope.rootTasks); } + // Update last repl ID of the database only if the current dump is not incremental. If bootstrap + // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change + // last repl ID of the database. + if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) { + loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope)); + work.updateDbEventState(null); + } + if (childTasks == null) { + childTasks = new ArrayList<>(); + } + childTasks.addAll(scope.rootTasks); + /* + Since there can be multiple rounds of this run all of which will be tied to the same + query id -- generated in compile phase , adding a additional UUID to the end to print each run + in separate files. + */ + LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); + // Populate the driver context with the scratch dir info from the repl context, so that the + // temp dirs will be cleaned up later + context.getFsScratchDirs().putAll(loadContext.pathInfo.getFsScratchDirs()); + createReplLoadCompleteAckTask(); LOG.info("completed load task run : {}", work.executedLoadTask()); return 0; } + private TaskTracker addLoadPartitionTasks(Context loadContext, BootstrapEvent next, TaskTracker dbTracker, + BootstrapEventsIterator iterator, Scope scope, TaskTracker loadTaskTracker, + TaskTracker tableTracker) throws Exception { + PartitionEvent event = (PartitionEvent) next; + TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); + LoadPartitions loadPartitions = + new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker, + event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); + /* + the tableTracker here should be a new instance and not an existing one as this can + only happen when we break in between loading partitions. + */ + TaskTracker partitionsTracker = loadPartitions.tasks(); + partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, + partitionsTracker); + partitionsTracker.debugLog("partitions"); + return partitionsTracker; + } + + private TaskTracker addLoadConstraintsTasks(Context loadContext, + BootstrapEvent next, + TaskTracker dbTracker, + Scope scope) throws IOException, SemanticException { + LoadConstraint loadConstraint = + new LoadConstraint(loadContext, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker constraintTracker = loadConstraint.tasks(); + scope.rootTasks.addAll(constraintTracker.tasks()); + constraintTracker.debugLog("constraints"); + return constraintTracker; + } + + private TaskTracker addLoadFunctionTasks(Context loadContext, BootstrapEventsIterator iterator, BootstrapEvent next, + TaskTracker dbTracker, Scope scope) throws IOException, SemanticException { + LoadFunction loadFunction = new LoadFunction(loadContext, iterator.replLogger(), + (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); + TaskTracker functionsTracker = loadFunction.tasks(); + if (!scope.database) { + scope.rootTasks.addAll(functionsTracker.tasks()); + } else { + setUpDependencies(dbTracker, functionsTracker); + } + functionsTracker.debugLog("functions"); + return functionsTracker; + } + public static Task createViewTask(MetaData metaData, String dbNameToLoadIn, HiveConf conf) throws SemanticException { Table table = new Table(metaData.getTable()); @@ -304,7 +352,8 @@ a database ( directory ) TableName tableName = HiveTableName.ofNullable(table.getTableName(), dbName); String dbDotView = tableName.getNotEmptyDbTable(); CreateViewDesc desc = new CreateViewDesc(dbDotView, table.getAllCols(), null, table.getParameters(), - table.getPartColNames(), false, false, false, table.getSd().getInputFormat(), table.getSd().getOutputFormat(), + table.getPartColNames(), false, false, false, table.getSd().getInputFormat(), + table.getSd().getOutputFormat(), table.getSd().getSerdeInfo().getSerializationLib()); String originalText = table.getViewOriginalText(); String expandedText = table.getViewExpandedText(); @@ -326,6 +375,7 @@ a database ( directory ) /** * If replication policy is changed between previous and current load, then the excluded tables in * the new replication policy will be dropped. + * * @throws HiveException Failed to get/drop the tables. */ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveException { @@ -340,30 +390,30 @@ private void dropTablesExcludedInReplScope(ReplScope replScope) throws HiveExcep // List all the tables that are excluded in the current repl scope. Iterable tableNames = Collections2.filter(db.getAllTables(dbName), tableName -> { - assert(tableName != null); + assert (tableName != null); return !tableName.toLowerCase().startsWith( - SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()) - && !replScope.tableIncludedInReplScope(tableName); + SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()) + && !replScope.tableIncludedInReplScope(tableName); }); for (String table : tableNames) { db.dropTable(dbName + "." + table, true); } LOG.info("Tables in the Database: {} that are excluded in the replication scope are dropped.", - dbName); + dbName); } private void createReplLoadCompleteAckTask() { if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks()) - || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { + || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) { //All repl load tasks are executed and status is 0, create the task to add the acknowledgement AckWork replLoadAckWork = new AckWork( - new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString())); + new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString())); Task loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf); - if (this.childTasks.isEmpty()) { + if (childTasks.isEmpty()) { this.childTasks.add(loadAckWorkTask); } else { DAGTraversal.traverse(this.childTasks, - new AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask))); + new AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask))); } } } @@ -374,7 +424,7 @@ private void createEndReplLogTask(Context context, Scope scope, if (work.isIncrementalLoad()) { dbProps = new HashMap<>(); dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), - work.incrementalLoadTasksBuilder().eventTo().toString()); + work.incrementalLoadTasksBuilder().eventTo().toString()); } else { Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); dbProps = dbInMetadata.getParameters(); @@ -392,7 +442,7 @@ private void createEndReplLogTask(Context context, Scope scope, /** * There was a database update done before and we want to make sure we update the last repl * id on this database as we are now going to switch to processing a new database. - * + *

* This has to be last task in the graph since if there are intermediate tasks and the last.repl.id * is a root level task then in the execution phase the root level tasks will get executed first, * however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed @@ -416,8 +466,8 @@ private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scop } private void partitionsPostProcessing(BootstrapEventsIterator iterator, - Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker, - TaskTracker partitionsTracker) { + Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker, + TaskTracker partitionsTracker) { setUpDependencies(tableTracker, partitionsTracker); if (!scope.database && !scope.table) { scope.rootTasks.addAll(partitionsTracker.tasks()); @@ -465,7 +515,7 @@ private int executeIncrementalLoad() { if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) { if (work.hasBootstrapLoadTasks()) { LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap " - + "mode after applying all events."); + + "mode after applying all events."); return executeBootStrapLoad(); } } @@ -502,10 +552,10 @@ private int executeIncrementalLoad() { mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid); AlterDatabaseSetPropertiesDesc alterDbDesc = - new AlterDatabaseSetPropertiesDesc(dbName, mapProp, - new ReplicationSpec(lastEventid, lastEventid)); + new AlterDatabaseSetPropertiesDesc(dbName, mapProp, + new ReplicationSpec(lastEventid, lastEventid)); Task updateReplIdTask = - TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf); + TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf); DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask)); work.setLastReplIDUpdated(true); @@ -517,7 +567,10 @@ private int executeIncrementalLoad() { if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) { DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf))); } - this.childTasks = childTasks; + if (this.childTasks == null) { + this.childTasks = new ArrayList<>(); + } + this.childTasks.addAll(childTasks); createReplLoadCompleteAckTask(); return 0; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index f072effc31..26cd59b082 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -44,6 +44,7 @@ final ReplScope currentReplScope; final String dumpDirectory; private boolean lastReplIDUpdated; + private String sourceDbName; private final ConstraintEventsIterator constraintsIterator; private int loadTaskRunCount = 0; @@ -60,12 +61,13 @@ final LineageState sessionStateLineageState; public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, - String dbNameToLoadIn, ReplScope currentReplScope, + String sourceDbName, String dbNameToLoadIn, ReplScope currentReplScope, LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException { sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; this.currentReplScope = currentReplScope; + this.sourceDbName = sourceDbName; // If DB name is changed during REPL LOAD, then set it instead of referring to source DB name. if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) { @@ -152,4 +154,8 @@ public boolean isLastReplIDUpdated() { public void setLastReplIDUpdated(boolean lastReplIDUpdated) { this.lastReplIDUpdated = lastReplIDUpdated; } + + public String getSourceDbName() { + return sourceDbName; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java new file mode 100644 index 0000000000..4e3fa61c42 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.ranger; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.List; + +/** + * NoOpRangerRestClient returns empty policies. + */ +public class NoOpRangerRestClient implements RangerRestClient { + + @Override + public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, + String dbName, String rangerHiveServiceName) { + return new RangerExportPolicyList(); + } + + @Override + public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName, + String baseUrl, + String rangerHiveServiceName) throws Exception { + return null; + } + + @Override + public List removeMultiResourcePolicies(List rangerPolicies) { + return null; + } + + @Override + public List changeDataSet(List rangerPolicies, String sourceDbName, + String targetDbName) { + return null; + } + + @Override + public Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Path stagingDirPath, + String fileName, HiveConf conf) throws Exception { + return null; + } + + @Override + public RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, HiveConf conf) throws SemanticException { + return null; + } + + @Override + public boolean checkConnection(String url) throws Exception { + return true; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerBaseModelObject.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerBaseModelObject.java new file mode 100644 index 0000000000..2f999afbbb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerBaseModelObject.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.ranger; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Date; + +/** + * RangerBaseModelObject class to contain common attributes of Ranger Base object. + */ +@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, + fieldVisibility = Visibility.ANY) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class RangerBaseModelObject implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private Long id; + private String guid; + private Boolean isEnabled; + private String createdBy; + private String updatedBy; + private Date createTime; + private Date updateTime; + private Long version; + + public RangerBaseModelObject() { + setIsEnabled(null); + } + + public void updateFrom(RangerBaseModelObject other) { + setIsEnabled(other.getIsEnabled()); + } + + /** + * @return the id + */ + public Long getId() { + return id; + } + + /** + * @param id the id to set + */ + public void setId(Long id) { + this.id = id; + } + + /** + * @return the guid + */ + public String getGuid() { + return guid; + } + + /** + * @param guid the guid to set + */ + public void setGuid(String guid) { + this.guid = guid; + } + + /** + * @return the isEnabled + */ + public Boolean getIsEnabled() { + return isEnabled; + } + + /** + * @param isEnabled the isEnabled to set + */ + public void setIsEnabled(Boolean isEnabled) { + this.isEnabled = isEnabled == null ? Boolean.TRUE : isEnabled; + } + + /** + * @return the createdBy + */ + public String getCreatedBy() { + return createdBy; + } + + /** + * @param createdBy the createdBy to set + */ + public void setCreatedBy(String createdBy) { + this.createdBy = createdBy; + } + + /** + * @return the updatedBy + */ + public String getUpdatedBy() { + return updatedBy; + } + + /** + * @param updatedBy the updatedBy to set + */ + public void setUpdatedBy(String updatedBy) { + this.updatedBy = updatedBy; + } + + /** + * @return the createTime + */ + public Date getCreateTime() { + return new Date(createTime.getTime()); + } + + /** + * @param createTime the createTime to set + */ + public void setCreateTime(Date createTime) { + this.createTime = new Date(createTime.getTime()); + } + + /** + * @return the updateTime + */ + public Date getUpdateTime() { + return new Date(updateTime.getTime()); + } + + /** + * @param updateTime the updateTime to set + */ + public void setUpdateTime(Date updateTime) { + this.updateTime = new Date(updateTime.getTime()); + } + + /** + * @return the version + */ + public Long getVersion() { + return version; + } + + /** + * @param version the version to set + */ + public void setVersion(Long version) { + this.version = version; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("id={").append(id).append("} "); + sb.append("guid={").append(guid).append("} "); + sb.append("isEnabled={").append(isEnabled).append("} "); + sb.append("createdBy={").append(createdBy).append("} "); + sb.append("updatedBy={").append(updatedBy).append("} "); + sb.append("createTime={").append(createTime).append("} "); + sb.append("updateTime={").append(updateTime).append("} "); + sb.append("version={").append(version).append("} "); + + return sb; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerExportPolicyList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerExportPolicyList.java new file mode 100644 index 0000000000..a395feb055 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerExportPolicyList.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.ranger; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * RangerExportPolicyList class to extends RangerPolicyList class. + */ +@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, + fieldVisibility = Visibility.ANY) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class RangerExportPolicyList extends RangerPolicyList implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private Map metaDataInfo = new LinkedHashMap(); + + public Map getMetaDataInfo() { + return metaDataInfo; + } + + public void setMetaDataInfo(Map metaDataInfo) { + this.metaDataInfo = metaDataInfo; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicy.java new file mode 100644 index 0000000000..733a8983a4 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicy.java @@ -0,0 +1,1513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.ranger; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * RangerPolicy class to contain Ranger Policy details. + */ +@JsonAutoDetect(fieldVisibility = Visibility.ANY) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class RangerPolicy extends RangerBaseModelObject implements java.io.Serializable { + public static final int POLICY_TYPE_ACCESS = 0; + public static final int POLICY_TYPE_DATAMASK = 1; + public static final int POLICY_TYPE_ROWFILTER = 2; + + public static final int[] POLICY_TYPES = new int[]{ + POLICY_TYPE_ACCESS, + POLICY_TYPE_DATAMASK, + POLICY_TYPE_ROWFILTER, + }; + + public static final String MASK_TYPE_NULL = "MASK_NULL"; + public static final String MASK_TYPE_NONE = "MASK_NONE"; + public static final String MASK_TYPE_CUSTOM = "CUSTOM"; + + private static final long serialVersionUID = 1L; + + private String service; + private String name; + private Integer policyType; + private String description; + private String resourceSignature; + private Boolean isAuditEnabled; + private Map resources; + private List policyItems; + private List denyPolicyItems; + private List allowExceptions; + private List denyExceptions; + private List dataMaskPolicyItems; + private List rowFilterPolicyItems; + + + /** + * Ranger Policy default constructor. + */ + public RangerPolicy() { + this(null, null, null, null, null, null, null); + } + + /** + * @param service + * @param name + * @param policyType + * @param description + * @param resources + * @param policyItems + * @param resourceSignature TODO + */ + public RangerPolicy(String service, String name, Integer policyType, String description, + Map resources, List policyItems, + String resourceSignature) { + super(); + setService(service); + setName(name); + setPolicyType(policyType); + setDescription(description); + setResourceSignature(resourceSignature); + setIsAuditEnabled(null); + setResources(resources); + setPolicyItems(policyItems); + setDenyPolicyItems(null); + setAllowExceptions(null); + setDenyExceptions(null); + setDataMaskPolicyItems(null); + setRowFilterPolicyItems(null); + } + + /** + * @param other + */ + public void updateFrom(RangerPolicy other) { + super.updateFrom(other); + + setService(other.getService()); + setName(other.getName()); + setPolicyType(other.getPolicyType()); + setDescription(other.getDescription()); + setResourceSignature(other.getResourceSignature()); + setIsAuditEnabled(other.getIsAuditEnabled()); + setResources(other.getResources()); + setPolicyItems(other.getPolicyItems()); + setDenyPolicyItems(other.getDenyPolicyItems()); + setAllowExceptions(other.getAllowExceptions()); + setDenyExceptions(other.getDenyExceptions()); + setDataMaskPolicyItems(other.getDataMaskPolicyItems()); + setRowFilterPolicyItems(other.getRowFilterPolicyItems()); + } + + /** + * @return the type + */ + public String getService() { + return service; + } + + /** + * @param service the type to set + */ + public void setService(String service) { + this.service = service; + } + + /** + * @return the name + */ + public String getName() { + return name; + } + + /** + * @param name the name to set + */ + public void setName(String name) { + this.name = name; + } + + /** + * @return the policyType + */ + public Integer getPolicyType() { + return policyType; + } + + /** + * @param policyType the policyType to set + */ + public void setPolicyType(Integer policyType) { + this.policyType = policyType; + } + + /** + * @return the description + */ + public String getDescription() { + return description; + } + + /** + * @param description the description to set + */ + public void setDescription(String description) { + this.description = description; + } + + /** + * @return the resourceSignature + */ + public String getResourceSignature() { + return resourceSignature; + } + + /** + * @param resourceSignature the resourceSignature to set + */ + public void setResourceSignature(String resourceSignature) { + this.resourceSignature = resourceSignature; + } + + /** + * @return the isAuditEnabled + */ + public Boolean getIsAuditEnabled() { + return isAuditEnabled; + } + + /** + * @param isAuditEnabled the isEnabled to set + */ + public void setIsAuditEnabled(Boolean isAuditEnabled) { + this.isAuditEnabled = isAuditEnabled == null ? Boolean.TRUE : isAuditEnabled; + } + + /** + * @return the resources + */ + public Map getResources() { + return resources; + } + + /** + * @param resources the resources to set + */ + public void setResources(Map resources) { + if (this.resources == null) { + this.resources = new HashMap<>(); + } + + if (this.resources == resources) { + return; + } + + this.resources.clear(); + + if (resources != null) { + for (Map.Entry e : resources.entrySet()) { + this.resources.put(e.getKey(), e.getValue()); + } + } + } + + /** + * @return the policyItems + */ + public List getPolicyItems() { + return policyItems; + } + + /** + * @param policyItems the policyItems to set + */ + public void setPolicyItems(List policyItems) { + if (this.policyItems == null) { + this.policyItems = new ArrayList<>(); + } + + if (this.policyItems == policyItems) { + return; + } + + this.policyItems.clear(); + + if (policyItems != null) { + this.policyItems.addAll(policyItems); + } + } + + /** + * @return the denyPolicyItems + */ + public List getDenyPolicyItems() { + return denyPolicyItems; + } + + /** + * @param denyPolicyItems the denyPolicyItems to set + */ + public void setDenyPolicyItems(List denyPolicyItems) { + if (this.denyPolicyItems == null) { + this.denyPolicyItems = new ArrayList<>(); + } + + if (this.denyPolicyItems == denyPolicyItems) { + return; + } + + this.denyPolicyItems.clear(); + + if (denyPolicyItems != null) { + this.denyPolicyItems.addAll(denyPolicyItems); + } + } + + /** + * @return the allowExceptions + */ + public List getAllowExceptions() { + return allowExceptions; + } + + /** + * @param allowExceptions the allowExceptions to set + */ + public void setAllowExceptions(List allowExceptions) { + if (this.allowExceptions == null) { + this.allowExceptions = new ArrayList<>(); + } + + if (this.allowExceptions == allowExceptions) { + return; + } + + this.allowExceptions.clear(); + + if (allowExceptions != null) { + this.allowExceptions.addAll(allowExceptions); + } + } + + /** + * @return the denyExceptions + */ + public List getDenyExceptions() { + return denyExceptions; + } + + /** + * @param denyExceptions the denyExceptions to set + */ + public void setDenyExceptions(List denyExceptions) { + if (this.denyExceptions == null) { + this.denyExceptions = new ArrayList<>(); + } + + if (this.denyExceptions == denyExceptions) { + return; + } + + this.denyExceptions.clear(); + + if (denyExceptions != null) { + this.denyExceptions.addAll(denyExceptions); + } + } + + public List getDataMaskPolicyItems() { + return dataMaskPolicyItems; + } + + public void setDataMaskPolicyItems(List dataMaskPolicyItems) { + if (this.dataMaskPolicyItems == null) { + this.dataMaskPolicyItems = new ArrayList<>(); + } + + if (this.dataMaskPolicyItems == dataMaskPolicyItems) { + return; + } + + this.dataMaskPolicyItems.clear(); + + if (dataMaskPolicyItems != null) { + this.dataMaskPolicyItems.addAll(dataMaskPolicyItems); + } + } + + public List getRowFilterPolicyItems() { + return rowFilterPolicyItems; + } + + public void setRowFilterPolicyItems(List rowFilterPolicyItems) { + if (this.rowFilterPolicyItems == null) { + this.rowFilterPolicyItems = new ArrayList<>(); + } + + if (this.rowFilterPolicyItems == rowFilterPolicyItems) { + return; + } + + this.rowFilterPolicyItems.clear(); + + if (rowFilterPolicyItems != null) { + this.rowFilterPolicyItems.addAll(rowFilterPolicyItems); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("RangerPolicy={"); + + super.toString(sb); + + sb.append("service={").append(service).append("} "); + sb.append("name={").append(name).append("} "); + sb.append("policyType={").append(policyType).append("} "); + sb.append("description={").append(description).append("} "); + sb.append("resourceSignature={").append(resourceSignature).append("} "); + sb.append("isAuditEnabled={").append(isAuditEnabled).append("} "); + + sb.append("resources={"); + if (resources != null) { + for (Map.Entry e : resources.entrySet()) { + sb.append(e.getKey()).append("={"); + e.getValue().toString(sb); + sb.append("} "); + } + } + sb.append("} "); + + sb.append("policyItems={"); + if (policyItems != null) { + for (RangerPolicyItem policyItem : policyItems) { + if (policyItem != null) { + policyItem.toString(sb); + } + } + } + sb.append("} "); + + sb.append("denyPolicyItems={"); + if (denyPolicyItems != null) { + for (RangerPolicyItem policyItem : denyPolicyItems) { + if (policyItem != null) { + policyItem.toString(sb); + } + } + } + sb.append("} "); + + sb.append("allowExceptions={"); + if (allowExceptions != null) { + for (RangerPolicyItem policyItem : allowExceptions) { + if (policyItem != null) { + policyItem.toString(sb); + } + } + } + sb.append("} "); + + sb.append("denyExceptions={"); + if (denyExceptions != null) { + for (RangerPolicyItem policyItem : denyExceptions) { + if (policyItem != null) { + policyItem.toString(sb); + } + } + } + sb.append("} "); + + sb.append("dataMaskPolicyItems={"); + if (dataMaskPolicyItems != null) { + for (RangerDataMaskPolicyItem dataMaskPolicyItem : dataMaskPolicyItems) { + if (dataMaskPolicyItem != null) { + dataMaskPolicyItem.toString(sb); + } + } + } + sb.append("} "); + + sb.append("rowFilterPolicyItems={"); + if (rowFilterPolicyItems != null) { + for (RangerRowFilterPolicyItem rowFilterPolicyItem : rowFilterPolicyItems) { + if (rowFilterPolicyItem != null) { + rowFilterPolicyItem.toString(sb); + } + } + } + sb.append("} "); + + sb.append("}"); + + return sb; + } + + /** + * RangerPolicyResource class to store the resource path values. + */ + @JsonAutoDetect(fieldVisibility = Visibility.ANY) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class RangerPolicyResource implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private List values; + private Boolean isExcludes; + private Boolean isRecursive; + + public RangerPolicyResource() { + this((List) null, null, null); + } + + public RangerPolicyResource(String value) { + setValue(value); + setIsExcludes(null); + setIsRecursive(null); + } + + public RangerPolicyResource(String value, Boolean isExcludes, Boolean isRecursive) { + setValue(value); + setIsExcludes(isExcludes); + setIsRecursive(isRecursive); + } + + public RangerPolicyResource(List values, Boolean isExcludes, Boolean isRecursive) { + setValues(values); + setIsExcludes(isExcludes); + setIsRecursive(isRecursive); + } + + /** + * @return the values + */ + public List getValues() { + return values; + } + + /** + * @param values the values to set + */ + public void setValues(List values) { + if (this.values == null) { + this.values = new ArrayList<>(); + } + if (this.values == values) { + return; + } + this.values.clear(); + if (values != null) { + this.values.addAll(values); + } + } + + /** + * @param value the value to set + */ + public void setValue(String value) { + if (this.values == null) { + this.values = new ArrayList<>(); + } + this.values.clear(); + this.values.add(value); + } + + /** + * @return the isExcludes + */ + public Boolean getIsExcludes() { + return isExcludes; + } + + /** + * @param isExcludes the isExcludes to set + */ + public void setIsExcludes(Boolean isExcludes) { + this.isExcludes = isExcludes == null ? Boolean.FALSE : isExcludes; + } + + /** + * @return the isRecursive + */ + public Boolean getIsRecursive() { + return isRecursive; + } + + /** + * @param isRecursive the isRecursive to set + */ + public void setIsRecursive(Boolean isRecursive) { + this.isRecursive = isRecursive == null ? Boolean.FALSE : isRecursive; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("RangerPolicyResource={"); + sb.append("values={"); + if (values != null) { + for (String value : values) { + sb.append(value).append(" "); + } + } + sb.append("} "); + sb.append("isExcludes={").append(isExcludes).append("} "); + sb.append("isRecursive={").append(isRecursive).append("} "); + sb.append("}"); + + return sb; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((isExcludes == null) ? 0 : isExcludes.hashCode()); + result = prime * result + + ((isRecursive == null) ? 0 : isRecursive.hashCode()); + result = prime * result + + ((values == null) ? 0 : values.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RangerPolicyResource other = (RangerPolicyResource) obj; + if (isExcludes == null) { + if (other.isExcludes != null) { + return false; + } + } else if (!isExcludes.equals(other.isExcludes)) { + return false; + } + if (isRecursive == null) { + if (other.isRecursive != null) { + return false; + } + } else if (!isRecursive.equals(other.isRecursive)) { + return false; + } + if (values == null) { + if (other.values != null) { + return false; + } + } else if (!values.equals(other.values)) { + return false; + } + return true; + } + } + + /** + * RangerPolicyItem class contains ranger policy items like access and permissions. + */ + @JsonAutoDetect(fieldVisibility = Visibility.ANY) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class RangerPolicyItem implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private List accesses; + private List users; + private List groups; + private List conditions; + private Boolean delegateAdmin; + + public RangerPolicyItem() { + this(null, null, null, null, null); + } + + public RangerPolicyItem(List accessTypes, List users, List groups, + List conditions, Boolean delegateAdmin) { + setAccesses(accessTypes); + setUsers(users); + setGroups(groups); + setConditions(conditions); + setDelegateAdmin(delegateAdmin); + } + + /** + * @return the accesses + */ + public List getAccesses() { + return accesses; + } + + /** + * @param accesses the accesses to set + */ + public void setAccesses(List accesses) { + if (this.accesses == null) { + this.accesses = new ArrayList<>(); + } + + if (this.accesses == accesses) { + return; + } + + this.accesses.clear(); + + if (accesses != null) { + this.accesses.addAll(accesses); + } + } + + /** + * @return the users + */ + public List getUsers() { + return users; + } + + /** + * @param users the users to set + */ + public void setUsers(List users) { + if (this.users == null) { + this.users = new ArrayList<>(); + } + + if (this.users == users) { + return; + } + + this.users.clear(); + + if (users != null) { + this.users.addAll(users); + } + } + + /** + * @return the groups + */ + public List getGroups() { + return groups; + } + + /** + * @param groups the groups to set + */ + public void setGroups(List groups) { + if (this.groups == null) { + this.groups = new ArrayList<>(); + } + if (this.groups == groups) { + return; + } + this.groups.clear(); + if (groups != null) { + this.groups.addAll(groups); + } + } + + /** + * @return the conditions + */ + public List getConditions() { + return conditions; + } + + /** + * @param conditions the conditions to set + */ + public void setConditions(List conditions) { + if (this.conditions == null) { + this.conditions = new ArrayList<>(); + } + if (this.conditions == conditions) { + return; + } + this.conditions.clear(); + if (conditions != null) { + this.conditions.addAll(conditions); + } + } + + /** + * @return the delegateAdmin + */ + public Boolean getDelegateAdmin() { + return delegateAdmin; + } + + /** + * @param delegateAdmin the delegateAdmin to set + */ + public void setDelegateAdmin(Boolean delegateAdmin) { + this.delegateAdmin = delegateAdmin == null ? Boolean.FALSE : delegateAdmin; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("RangerPolicyItem={"); + sb.append("accessTypes={"); + if (accesses != null) { + for (RangerPolicyItemAccess access : accesses) { + if (access != null) { + access.toString(sb); + } + } + } + sb.append("} "); + sb.append("users={"); + if (users != null) { + for (String user : users) { + if (user != null) { + sb.append(user).append(" "); + } + } + } + sb.append("} "); + sb.append("groups={"); + if (groups != null) { + for (String group : groups) { + if (group != null) { + sb.append(group).append(" "); + } + } + } + sb.append("} "); + sb.append("conditions={"); + if (conditions != null) { + for (RangerPolicyItemCondition condition : conditions) { + if (condition != null) { + condition.toString(sb); + } + } + } + sb.append("} "); + sb.append("delegateAdmin={").append(delegateAdmin).append("} "); + sb.append("}"); + + return sb; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((accesses == null) ? 0 : accesses.hashCode()); + result = prime * result + + ((conditions == null) ? 0 : conditions.hashCode()); + result = prime * result + + ((delegateAdmin == null) ? 0 : delegateAdmin.hashCode()); + result = prime * result + + ((groups == null) ? 0 : groups.hashCode()); + result = prime * result + ((users == null) ? 0 : users.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RangerPolicyItem other = (RangerPolicyItem) obj; + if (accesses == null) { + if (other.accesses != null) { + return false; + } + } else if (!accesses.equals(other.accesses)) { + return false; + } + if (conditions == null) { + if (other.conditions != null) { + return false; + } + } else if (!conditions.equals(other.conditions)) { + return false; + } + if (delegateAdmin == null) { + if (other.delegateAdmin != null) { + return false; + } + } else if (!delegateAdmin.equals(other.delegateAdmin)) { + return false; + } + if (groups == null) { + if (other.groups != null) { + return false; + } + } else if (!groups.equals(other.groups)) { + return false; + } + if (users == null) { + if (other.users != null) { + return false; + } + } else if (!users.equals(other.users)) { + return false; + } + return true; + + } + } + + /** + * RangerDataMaskPolicyItem class. + */ + @JsonAutoDetect(fieldVisibility = Visibility.ANY) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class RangerDataMaskPolicyItem extends RangerPolicyItem implements java.io.Serializable { + private static final long serialVersionUID = 1L; + private RangerPolicyItemDataMaskInfo dataMaskInfo; + + public RangerDataMaskPolicyItem() { + this(null, null, null, null, null, null); + } + + public RangerDataMaskPolicyItem(List accesses, + RangerPolicyItemDataMaskInfo dataMaskDetail, List users, + List groups, + List conditions, Boolean delegateAdmin) { + super(accesses, users, groups, conditions, delegateAdmin); + setDataMaskInfo(dataMaskDetail); + } + + /** + * @return the dataMaskInfo + */ + public RangerPolicyItemDataMaskInfo getDataMaskInfo() { + return dataMaskInfo; + } + + /** + * @param dataMaskInfo the dataMaskInfo to set + */ + public void setDataMaskInfo(RangerPolicyItemDataMaskInfo dataMaskInfo) { + this.dataMaskInfo = dataMaskInfo == null ? new RangerPolicyItemDataMaskInfo() : dataMaskInfo; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((dataMaskInfo == null) ? 0 : dataMaskInfo.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (!super.equals(obj)) { + return false; + } + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RangerDataMaskPolicyItem other = (RangerDataMaskPolicyItem) obj; + if (dataMaskInfo == null) { + if (other.dataMaskInfo != null) { + return false; + } + } else if (!dataMaskInfo.equals(other.dataMaskInfo)) { + return false; + } + return true; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("RangerDataMaskPolicyItem={"); + super.toString(sb); + sb.append("dataMaskInfo={"); + if (dataMaskInfo != null) { + dataMaskInfo.toString(sb); + } + sb.append("} "); + sb.append("}"); + return sb; + } + } + + /** + * RangerRowFilterPolicyItem class. + */ + @JsonAutoDetect(fieldVisibility = Visibility.ANY) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class RangerRowFilterPolicyItem extends RangerPolicyItem implements java.io.Serializable { + private static final long serialVersionUID = 1L; + private RangerPolicyItemRowFilterInfo rowFilterInfo; + + public RangerRowFilterPolicyItem() { + this(null, null, null, null, null, null); + } + + public RangerRowFilterPolicyItem(RangerPolicyItemRowFilterInfo rowFilterInfo, + List accesses, List users, List groups, + List conditions, Boolean delegateAdmin) { + super(accesses, users, groups, conditions, delegateAdmin); + setRowFilterInfo(rowFilterInfo); + } + + /** + * @return the rowFilterInfo + */ + public RangerPolicyItemRowFilterInfo getRowFilterInfo() { + return rowFilterInfo; + } + + /** + * @param rowFilterInfo the rowFilterInfo to set + */ + public void setRowFilterInfo(RangerPolicyItemRowFilterInfo rowFilterInfo) { + this.rowFilterInfo = rowFilterInfo == null ? new RangerPolicyItemRowFilterInfo() : rowFilterInfo; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((rowFilterInfo == null) ? 0 : rowFilterInfo.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (!super.equals(obj)) { + return false; + } + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RangerRowFilterPolicyItem other = (RangerRowFilterPolicyItem) obj; + if (rowFilterInfo == null) { + if (other.rowFilterInfo != null) { + return false; + } + } else if (!rowFilterInfo.equals(other.rowFilterInfo)) { + return false; + } + return true; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("RangerRowFilterPolicyItem={"); + super.toString(sb); + sb.append("rowFilterInfo={"); + if (rowFilterInfo != null) { + rowFilterInfo.toString(sb); + } + sb.append("} "); + sb.append("}"); + return sb; + } + } + + /** + * RangerPolicyItemAccess class. + */ + @JsonAutoDetect(fieldVisibility = Visibility.ANY) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class RangerPolicyItemAccess implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private String type; + private Boolean isAllowed; + + public RangerPolicyItemAccess() { + this(null, null); + } + + public RangerPolicyItemAccess(String type) { + this(type, null); + } + + public RangerPolicyItemAccess(String type, Boolean isAllowed) { + setType(type); + setIsAllowed(isAllowed); + } + + /** + * @return the type + */ + public String getType() { + return type; + } + + /** + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * @return the isAllowed + */ + public Boolean getIsAllowed() { + return isAllowed; + } + + /** + * @param isAllowed the isAllowed to set + */ + public void setIsAllowed(Boolean isAllowed) { + this.isAllowed = isAllowed == null ? Boolean.TRUE : isAllowed; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("RangerPolicyItemAccess={"); + sb.append("type={").append(type).append("} "); + sb.append("isAllowed={").append(isAllowed).append("} "); + sb.append("}"); + return sb; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((isAllowed == null) ? 0 : isAllowed.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RangerPolicyItemAccess other = (RangerPolicyItemAccess) obj; + if (isAllowed == null) { + if (other.isAllowed != null) { + return false; + } + } else if (!isAllowed.equals(other.isAllowed)) { + return false; + } + if (type == null) { + if (other.type != null) { + return false; + } + } else if (!type.equals(other.type)) { + return false; + } + return true; + } + } + + /** + * RangerPolicyItemCondition class to store policy conditions. + */ + @JsonAutoDetect(fieldVisibility = Visibility.ANY) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class RangerPolicyItemCondition implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private String type; + private List values; + + public RangerPolicyItemCondition() { + this(null, null); + } + + public RangerPolicyItemCondition(String type, List values) { + setType(type); + setValues(values); + } + + /** + * @return the type + */ + public String getType() { + return type; + } + + /** + * @param type the type to set + */ + public void setType(String type) { + this.type = type; + } + + /** + * @return the value + */ + public List getValues() { + return values; + } + + /** + * @param values the value to set + */ + public void setValues(List values) { + if (this.values == null) { + this.values = new ArrayList<>(); + } + if (this.values == values) { + return; + } + this.values.clear(); + if (values != null) { + this.values.addAll(values); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("RangerPolicyItemCondition={"); + sb.append("type={").append(type).append("} "); + sb.append("values={"); + if (values != null) { + for (String value : values) { + sb.append(value).append(" "); + } + } + sb.append("} "); + sb.append("}"); + + return sb; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((type == null) ? 0 : type.hashCode()); + result = prime * result + ((values == null) ? 0 : values.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RangerPolicyItemCondition other = (RangerPolicyItemCondition) obj; + if (type == null) { + if (other.type != null) { + return false; + } + } else if (!type.equals(other.type)) { + return false; + } + if (values == null) { + if (other.values != null) { + return false; + } + } else if (!values.equals(other.values)) { + return false; + } + return true; + } + } + + /** + * RangerPolicyItemDataMaskInfo store policy having datamasking. + */ + @JsonAutoDetect(fieldVisibility = Visibility.ANY) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class RangerPolicyItemDataMaskInfo implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private String dataMaskType; + private String conditionExpr; + private String valueExpr; + + public RangerPolicyItemDataMaskInfo() { + } + + public RangerPolicyItemDataMaskInfo(String dataMaskType, String conditionExpr, String valueExpr) { + setDataMaskType(dataMaskType); + setConditionExpr(conditionExpr); + setValueExpr(valueExpr); + } + + public String getDataMaskType() { + return dataMaskType; + } + + public void setDataMaskType(String dataMaskType) { + this.dataMaskType = dataMaskType; + } + + public String getConditionExpr() { + return conditionExpr; + } + + public void setConditionExpr(String conditionExpr) { + this.conditionExpr = conditionExpr; + } + + public String getValueExpr() { + return valueExpr; + } + + public void setValueExpr(String valueExpr) { + this.valueExpr = valueExpr; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((dataMaskType == null) ? 0 : dataMaskType.hashCode()); + result = prime * result + ((conditionExpr == null) ? 0 : conditionExpr.hashCode()); + result = prime * result + ((valueExpr == null) ? 0 : valueExpr.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (!super.equals(obj)) { + return false; + } + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RangerPolicyItemDataMaskInfo other = (RangerPolicyItemDataMaskInfo) obj; + if (dataMaskType == null) { + if (other.dataMaskType != null) { + return false; + } + } else if (!dataMaskType.equals(other.dataMaskType)) { + return false; + } + if (conditionExpr == null) { + if (other.conditionExpr != null) { + return false; + } + } else if (!conditionExpr.equals(other.conditionExpr)) { + return false; + } + if (valueExpr == null) { + if (other.valueExpr != null) { + return false; + } + } else if (!valueExpr.equals(other.valueExpr)) { + return false; + } + return true; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + private StringBuilder toString(StringBuilder sb) { + sb.append("RangerPolicyItemDataMaskInfo={"); + sb.append("dataMaskType={").append(dataMaskType).append("} "); + sb.append("conditionExpr={").append(conditionExpr).append("} "); + sb.append("valueExpr={").append(valueExpr).append("} "); + sb.append("}"); + return sb; + } + } + + /** + * Ranger policyItem Row-filter info class. + */ + @JsonAutoDetect(fieldVisibility = Visibility.ANY) + @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown = true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.FIELD) + public static class RangerPolicyItemRowFilterInfo implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + private String filterExpr; + + public RangerPolicyItemRowFilterInfo() { + } + + public RangerPolicyItemRowFilterInfo(String filterExpr) { + setFilterExpr(filterExpr); + } + + public String getFilterExpr() { + return filterExpr; + } + + public void setFilterExpr(String filterExpr) { + this.filterExpr = filterExpr; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((filterExpr == null) ? 0 : filterExpr.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (!super.equals(obj)) { + return false; + } + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RangerPolicyItemRowFilterInfo other = (RangerPolicyItemRowFilterInfo) obj; + if (filterExpr == null) { + if (other.filterExpr != null) { + return false; + } + } else if (!filterExpr.equals(other.filterExpr)) { + return false; + } + return true; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + toString(sb); + return sb.toString(); + } + + public StringBuilder toString(StringBuilder sb) { + sb.append("RangerPolicyItemDataMaskInfo={"); + sb.append("filterExpr={").append(filterExpr).append("} "); + sb.append("}"); + return sb; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicyList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicyList.java new file mode 100644 index 0000000000..3de935ac57 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerPolicyList.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.ranger; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/** + * RangerPolicyList class to contain List of RangerPolicy objects. + */ +@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, + fieldVisibility = Visibility.ANY) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class RangerPolicyList { + private static final long serialVersionUID = 1L; + + private List policies = new ArrayList(); + + public RangerPolicyList() { + super(); + } + + public RangerPolicyList(List objList) { + this.policies = objList; + } + + public List getPolicies() { + return policies; + } + + public void setPolicies(List policies) { + this.policies = policies; + } + + public int getListSize() { + if (policies != null) { + return policies.size(); + } + return 0; + } + + + public List getList() { + return policies; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java new file mode 100644 index 0000000000..eab20f459e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.ranger; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.util.List; + +/** + * RangerRestClient to connect to Ranger service and export policies. + */ +public interface RangerRestClient { + RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, + String dbName, String rangerHiveServiceName) throws Exception; + + RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName, + String baseUrl, + String rangerHiveServiceName) throws Exception; + + List removeMultiResourcePolicies(List rangerPolicies); + + List changeDataSet(List rangerPolicies, String sourceDbName, + String targetDbName); + + Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Path stagingDirPath, + String fileName, HiveConf conf) throws Exception; + + RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, + HiveConf conf) throws SemanticException; + + boolean checkConnection(String url) throws Exception; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java new file mode 100644 index 0000000000..c535f9ea07 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.ranger; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.multipart.FormDataMultiPart; +import com.sun.jersey.multipart.MultiPart; +import com.sun.jersey.multipart.file.StreamDataBodyPart; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.Retry; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.eclipse.jetty.util.MultiPartWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.MediaType; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.File; +import java.io.InputStreamReader; +import java.io.InputStream; +import java.io.Reader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; + +/** + * RangerRestClientImpl to connect to Ranger and export policies. + */ +public class RangerRestClientImpl implements RangerRestClient { + private static final Logger LOG = LoggerFactory.getLogger(RangerRestClientImpl.class); + private static final String RANGER_REST_URL_EXPORTJSONFILE = "/service/plugins/policies/exportJson"; + private static final String RANGER_REST_URL_IMPORTJSONFILE = + "/service/plugins/policies/importPoliciesFromFile?updateIfExists=true"; + + public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, + String dbName, + String rangerHiveServiceName)throws SemanticException { + LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint); + ClientResponse clientResp; + String uri; + if (StringUtils.isEmpty(rangerHiveServiceName)) { + throw new SemanticException("Ranger Service Name cannot be empty"); + } + uri = RANGER_REST_URL_EXPORTJSONFILE + "?serviceName=" + rangerHiveServiceName + "&polResource=" + + dbName + "&resource:database=" + dbName + + "&serviceType=hive&resourceMatchScope=self_or_ancestor&resourceMatch=full"; + if (sourceRangerEndpoint.endsWith("/")) { + sourceRangerEndpoint = StringUtils.removePattern(sourceRangerEndpoint, "/+$"); + } + String url = sourceRangerEndpoint + (uri.startsWith("/") ? uri : ("/" + uri)); + LOG.debug("Url to export policies from source Ranger: {}", url); + RangerExportPolicyList rangerExportPolicyList = new RangerExportPolicyList(); + WebResource.Builder builder = getRangerResourceBuilder(url); + clientResp = builder.get(ClientResponse.class); + + String response = null; + if (clientResp != null) { + if (clientResp.getStatus() == HttpServletResponse.SC_OK) { + Gson gson = new GsonBuilder().create(); + response = clientResp.getEntity(String.class); + LOG.debug("Response received for ranger export {} ", response); + if (StringUtils.isNotEmpty(response)) { + rangerExportPolicyList = gson.fromJson(response, RangerExportPolicyList.class); + return rangerExportPolicyList; + } + } else if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { + LOG.debug("Ranger policy export request returned empty list"); + return rangerExportPolicyList; + } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { + throw new SemanticException("Authentication Failure while communicating to Ranger admin"); + } else if (clientResp.getStatus() == HttpServletResponse.SC_FORBIDDEN) { + throw new SemanticException("Authorization Failure while communicating to Ranger admin"); + } + } + if (StringUtils.isEmpty(response)) { + LOG.debug("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs."); + } + return rangerExportPolicyList; + } + + public List removeMultiResourcePolicies(List rangerPolicies) { + List rangerPoliciesToImport = new ArrayList(); + if (CollectionUtils.isNotEmpty(rangerPolicies)) { + Map rangerPolicyResourceMap = null; + RangerPolicy.RangerPolicyResource rangerPolicyResource = null; + List resourceNameList = null; + for (RangerPolicy rangerPolicy : rangerPolicies) { + if (rangerPolicy != null) { + rangerPolicyResourceMap = rangerPolicy.getResources(); + if (rangerPolicyResourceMap != null) { + rangerPolicyResource = rangerPolicyResourceMap.get("database"); + if (rangerPolicyResource != null) { + resourceNameList = rangerPolicyResource.getValues(); + if (CollectionUtils.isNotEmpty(resourceNameList) && resourceNameList.size() == 1) { + rangerPoliciesToImport.add(rangerPolicy); + } + } + } + } + } + } + return rangerPoliciesToImport; + } + + @Override + public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName, + String baseUrl, + String rangerHiveServiceName) + throws Exception { + String sourceClusterServiceName = null; + String serviceMapJsonFileName = "hive_servicemap.json"; + String rangerPoliciesJsonFileName = "hive_replicationPolicies.json"; + String uri = RANGER_REST_URL_IMPORTJSONFILE + "&polResource=" + dbName; + + if (!rangerExportPolicyList.getPolicies().isEmpty()) { + sourceClusterServiceName = rangerExportPolicyList.getPolicies().get(0).getService(); + } + + if (StringUtils.isEmpty(sourceClusterServiceName)) { + sourceClusterServiceName = rangerHiveServiceName; + } + + Map serviceMap = new LinkedHashMap(); + if (!StringUtils.isEmpty(sourceClusterServiceName) && !StringUtils.isEmpty(rangerHiveServiceName)) { + serviceMap.put(sourceClusterServiceName, rangerHiveServiceName); + } + + Gson gson = new GsonBuilder().create(); + String jsonServiceMap = gson.toJson(serviceMap); + + String jsonRangerExportPolicyList = gson.toJson(rangerExportPolicyList); + + String url = baseUrl + + (uri.startsWith("/") ? uri : ("/" + uri)); + + LOG.debug("URL to import policies on target Ranger: {}", url); + ClientResponse clientResp = null; + + StreamDataBodyPart filePartPolicies = new StreamDataBodyPart("file", + new ByteArrayInputStream(jsonRangerExportPolicyList.getBytes(StandardCharsets.UTF_8)), + rangerPoliciesJsonFileName); + StreamDataBodyPart filePartServiceMap = new StreamDataBodyPart("servicesMapJson", + new ByteArrayInputStream(jsonServiceMap.getBytes(StandardCharsets.UTF_8)), serviceMapJsonFileName); + + FormDataMultiPart formDataMultiPart = new FormDataMultiPart(); + MultiPart multipartEntity = null; + try { + multipartEntity = formDataMultiPart.bodyPart(filePartPolicies).bodyPart(filePartServiceMap); + WebResource.Builder builder = getRangerResourceBuilder(url); + clientResp = builder.accept(MediaType.APPLICATION_JSON).type(MediaType.MULTIPART_FORM_DATA) + .post(ClientResponse.class, multipartEntity); + if (clientResp != null) { + if (clientResp.getStatus() == HttpServletResponse.SC_NO_CONTENT) { + LOG.debug("Ranger policy import finished successfully"); + + } else if (clientResp.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) { + throw new Exception("Authentication Failure while communicating to Ranger admin"); + } else { + throw new Exception("Ranger policy import failed, Please refer target Ranger admin logs."); + } + } + } finally { + try { + if (filePartPolicies != null) { + filePartPolicies.cleanup(); + } + if (filePartServiceMap != null) { + filePartServiceMap.cleanup(); + } + if (formDataMultiPart != null) { + formDataMultiPart.close(); + } + if (multipartEntity != null) { + multipartEntity.close(); + } + } catch (IOException e) { + LOG.error("Exception occurred while closing resources: {}", e); + } + } + return rangerExportPolicyList; + } + + private synchronized Client getRangerClient() { + Client ret = null; + ClientConfig config = new DefaultClientConfig(); + config.getClasses().add(MultiPartWriter.class); + config.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, true); + ret = Client.create(config); + return ret; + } + + @Override + public List changeDataSet(List rangerPolicies, String sourceDbName, + String targetDbName) { + if (sourceDbName.endsWith("/")) { + sourceDbName = StringUtils.removePattern(sourceDbName, "/+$"); + } + if (targetDbName.endsWith("/")) { + targetDbName = StringUtils.removePattern(targetDbName, "/+$"); + } + if (targetDbName.equals(sourceDbName)) { + return rangerPolicies; + } + if (CollectionUtils.isNotEmpty(rangerPolicies)) { + Map rangerPolicyResourceMap = null; + RangerPolicy.RangerPolicyResource rangerPolicyResource = null; + List resourceNameList = null; + for (RangerPolicy rangerPolicy : rangerPolicies) { + if (rangerPolicy != null) { + rangerPolicyResourceMap = rangerPolicy.getResources(); + if (rangerPolicyResourceMap != null) { + rangerPolicyResource = rangerPolicyResourceMap.get("database"); + if (rangerPolicyResource != null) { + resourceNameList = rangerPolicyResource.getValues(); + if (CollectionUtils.isNotEmpty(resourceNameList)) { + for (int i = 0; i < resourceNameList.size(); i++) { + String resourceName = resourceNameList.get(i); + if (resourceName.equals(sourceDbName)) { + resourceNameList.set(i, targetDbName); + } + } + } + } + } + } + } + } + return rangerPolicies; + } + + private Path writeExportedRangerPoliciesToJsonFile(String jsonString, String fileName, Path stagingDirPath, + HiveConf conf) + throws IOException { + String filePath = ""; + Path newPath = null; + FSDataOutputStream outStream = null; + OutputStreamWriter writer = null; + try { + if (!StringUtils.isEmpty(jsonString)) { + FileSystem fileSystem = stagingDirPath.getFileSystem(conf); + if (fileSystem != null) { + if (!fileSystem.exists(stagingDirPath)) { + fileSystem.mkdirs(stagingDirPath); + } + newPath = stagingDirPath.suffix(File.separator + fileName); + outStream = fileSystem.create(newPath, true); + writer = new OutputStreamWriter(outStream, "UTF-8"); + writer.write(jsonString); + } + } + } catch (IOException ex) { + if (newPath != null) { + filePath = newPath.toString(); + } + throw new IOException("Failed to write json string to file:" + filePath, ex); + } catch (Exception ex) { + if (newPath != null) { + filePath = newPath.toString(); + } + throw new IOException("Failed to write json string to file:" + filePath, ex); + } finally { + try { + if (writer != null) { + writer.close(); + } + if (outStream != null) { + outStream.close(); + } + } catch (Exception ex) { + throw new IOException("Unable to close writer/outStream.", ex); + } + } + return newPath; + } + + @Override + public Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Path stagingDirPath, + String fileName, HiveConf conf) throws Exception { + Gson gson = new GsonBuilder().create(); + String jsonRangerExportPolicyList = gson.toJson(rangerExportPolicyList); + Retry retriable = new Retry(IOException.class) { + @Override + public Path execute() throws IOException { + return writeExportedRangerPoliciesToJsonFile(jsonRangerExportPolicyList, fileName, + stagingDirPath, conf); + } + }; + try { + return retriable.run(); + } catch (Exception e) { + throw new SemanticException(e); + } + } + + @Override + public RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, + HiveConf conf) throws SemanticException { + RangerExportPolicyList rangerExportPolicyList = null; + Gson gsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create(); + try { + FileSystem fs = filePath.getFileSystem(conf); + InputStream inputStream = fs.open(filePath); + Reader reader = new InputStreamReader(inputStream, Charset.forName("UTF-8")); + rangerExportPolicyList = gsonBuilder.fromJson(reader, RangerExportPolicyList.class); + } catch (Exception ex) { + throw new SemanticException("Error reading file :" + filePath, ex); + } + return rangerExportPolicyList; + } + + @Override + public boolean checkConnection(String url) { + WebResource.Builder builder; + builder = getRangerResourceBuilder(url); + ClientResponse clientResp = builder.get(ClientResponse.class); + return (clientResp.getStatus() < HttpServletResponse.SC_UNAUTHORIZED); + } + + + private WebResource.Builder getRangerResourceBuilder(String url) { + Client client = getRangerClient(); + WebResource webResource = client.resource(url); + WebResource.Builder builder = webResource.getRequestBuilder(); + return builder; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 4fcee0e34a..377f742a70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -79,6 +79,9 @@ // Root base directory name for hive. public static final String REPL_HIVE_BASE_DIR = "hive"; + // Root base directory name for ranger. + public static final String REPL_RANGER_BASE_DIR = "ranger"; + // Name of the directory which stores the list of tables included in the policy in case of table level replication. // One file per database, named after the db name. The directory is not created for db level replication. public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; @@ -100,6 +103,10 @@ // Reserved number of items to accommodate operational files in the dump root dir. public static final int RESERVED_DIR_ITEMS_COUNT = 10; + + public static final String RANGER_AUTHORIZER = "ranger"; + + public static final String HIVE_RANGER_POLICIES_FILE_NAME = "ranger_policies.json"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index c4ff070da6..7959df2b2f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -395,7 +395,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } else { LOG.debug("{} contains an bootstrap dump", loadPath); } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(), + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), sourceDbNameOrPattern, + replScope.getDbName(), dmd.getReplScope(), queryState.getLineageState(), evDump, dmd.getEventTo()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java new file mode 100644 index 0000000000..fc3cfe3c32 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_RANGER_SERVICE_NAME; + +/** + * Unit test class for testing Ranger Dump + */ +@RunWith(MockitoJUnitRunner.class) +public class TestRangerDumpTask { + + protected static final Logger LOG = LoggerFactory.getLogger(TestRangerDumpTask.class); + private RangerDumpTask task; + + @Mock + private RangerRestClientImpl mockClient; + + @Mock + private HiveConf conf; + + @Mock + private RangerDumpWork work; + + @Before + public void setup() throws Exception { + task = new RangerDumpTask(mockClient, conf, work); + Mockito.when(mockClient.removeMultiResourcePolicies(Mockito.anyList())).thenCallRealMethod(); + Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenReturn(true); + } + + @Test + public void testFailureInvalidAuthProviderEndpoint() throws Exception { + Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn(null); + int status = task.execute(); + Assert.assertEquals(40000, status); + } + + @Test + public void testSuccessValidAuthProviderEndpoint() throws Exception { + RangerExportPolicyList rangerPolicyList = new RangerExportPolicyList(); + rangerPolicyList.setPolicies(new ArrayList()); + Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(rangerPolicyList); + Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint"); + Mockito.when(conf.getVar(REPL_RANGER_SERVICE_NAME)).thenReturn("cm_hive"); + Mockito.when(work.getDbName()).thenReturn("testdb"); + int status = task.execute(); + Assert.assertEquals(0, status); + } + + @Test + public void testSuccessNonEmptyRangerPolicies() throws Exception { + String rangerResponse = "{\"metaDataInfo\":{\"Host name\":\"org.apache.ranger.com\"," + + "\"Exported by\":\"hive\",\"Export time\":\"May 5, 2020, 8:55:03 AM\",\"Ranger apache version\"" + + ":\"2.0.0.7.2.0.0-61\"},\"policies\":[{\"service\":\"cm_hive\",\"name\":\"db-level\",\"policyType\":0," + + "\"description\":\"\",\"isAuditEnabled\":true,\"resources\":{\"database\":{\"values\":[\"aa\"]," + + "\"isExcludes\":false,\"isRecursive\":false},\"column\":{\"values\":[\"id\"],\"isExcludes\":false," + + "\"isRecursive\":false},\"table\":{\"values\":[\"*\"],\"isExcludes\":false,\"isRecursive\":false}}," + + "\"policyItems\":[{\"accesses\":[{\"type\":\"select\",\"isAllowed\":true},{\"type\":\"update\"," + + "\"isAllowed\":true}],\"users\":[\"admin\"],\"groups\":[\"public\"],\"conditions\":[]," + + "\"delegateAdmin\":false}],\"denyPolicyItems\":[],\"allowExceptions\":[],\"denyExceptions\":[]," + + "\"dataMaskPolicyItems\":[],\"rowFilterPolicyItems\":[],\"id\":40,\"guid\":" + + "\"4e2b3406-7b9a-4004-8cdf-7a239c8e2cae\",\"isEnabled\":true,\"version\":1}]}"; + RangerExportPolicyList rangerPolicyList = new Gson().fromJson(rangerResponse, RangerExportPolicyList.class); + Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(rangerPolicyList); + Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint"); + Mockito.when(conf.getVar(REPL_RANGER_SERVICE_NAME)).thenReturn("cm_hive"); + Mockito.when(work.getDbName()).thenReturn("testdb"); + Path rangerDumpPath = new Path("/tmp"); + Mockito.when(work.getCurrentDumpPath()).thenReturn(rangerDumpPath); + Path policyFile = new Path(rangerDumpPath, ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME); + Mockito.when(mockClient.saveRangerPoliciesToFile(rangerPolicyList, rangerDumpPath, + ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME, conf)).thenReturn(policyFile); + int status = task.execute(); + Assert.assertEquals(0, status); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java new file mode 100644 index 0000000000..a385b70682 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import com.google.gson.Gson; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList; +import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT; + +/** + * Unit test class for testing Ranger Dump + */ +@RunWith(MockitoJUnitRunner.class) +public class TestRangerLoadTask { + + protected static final Logger LOG = LoggerFactory.getLogger(TestRangerLoadTask.class); + private RangerLoadTask task; + + @Mock + private RangerRestClientImpl mockClient; + + @Mock + private HiveConf conf; + + @Mock + private RangerLoadWork work; + + @Before + public void setup() throws Exception { + task = new RangerLoadTask(mockClient, conf, work); + Mockito.when(mockClient.changeDataSet(Mockito.anyList(), Mockito.anyString(), Mockito.anyString())) + .thenCallRealMethod(); + Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenReturn(true); + } + + @Test + public void testFailureInvalidAuthProviderEndpoint() { + Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn(null); + int status = task.execute(); + Assert.assertEquals(40000, status); + } + + @Test + public void testSuccessValidAuthProviderEndpoint() { + Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint"); + Mockito.when(work.getSourceDbName()).thenReturn("srcdb"); + Mockito.when(work.getTargetDbName()).thenReturn("tgtdb"); + int status = task.execute(); + Assert.assertEquals(0, status); + } + + @Test + public void testSuccessNonEmptyRangerPolicies() throws Exception { + String rangerResponse = "{\"metaDataInfo\":{\"Host name\":\"org.apache.ranger.com\"," + + "\"Exported by\":\"hive\",\"Export time\":\"May 5, 2020, 8:55:03 AM\",\"Ranger apache version\"" + + ":\"2.0.0.7.2.0.0-61\"},\"policies\":[{\"service\":\"cm_hive\",\"name\":\"db-level\",\"policyType\":0," + + "\"description\":\"\",\"isAuditEnabled\":true,\"resources\":{\"database\":{\"values\":[\"aa\"]," + + "\"isExcludes\":false,\"isRecursive\":false},\"column\":{\"values\":[\"id\"],\"isExcludes\":false," + + "\"isRecursive\":false},\"table\":{\"values\":[\"*\"],\"isExcludes\":false,\"isRecursive\":false}}," + + "\"policyItems\":[{\"accesses\":[{\"type\":\"select\",\"isAllowed\":true},{\"type\":\"update\"," + + "\"isAllowed\":true}],\"users\":[\"admin\"],\"groups\":[\"public\"],\"conditions\":[]," + + "\"delegateAdmin\":false}],\"denyPolicyItems\":[],\"allowExceptions\":[],\"denyExceptions\":[]," + + "\"dataMaskPolicyItems\":[],\"rowFilterPolicyItems\":[],\"id\":40,\"guid\":" + + "\"4e2b3406-7b9a-4004-8cdf-7a239c8e2cae\",\"isEnabled\":true,\"version\":1}]}"; + RangerExportPolicyList rangerPolicyList = new Gson().fromJson(rangerResponse, RangerExportPolicyList.class); + Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint"); + Mockito.when(work.getSourceDbName()).thenReturn("srcdb"); + Mockito.when(work.getTargetDbName()).thenReturn("tgtdb"); + Path rangerDumpPath = new Path("/tmp"); + Mockito.when(work.getCurrentDumpPath()).thenReturn(rangerDumpPath); + mockClient.saveRangerPoliciesToFile(rangerPolicyList, + rangerDumpPath, ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME, new HiveConf()); + Mockito.when(mockClient.readRangerPoliciesFromJsonFile(Mockito.any(), Mockito.any())).thenReturn(rangerPolicyList); + int status = task.execute(); + Assert.assertEquals(0, status); + } +}