diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5a39006d8a..405daed3e6 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -542,6 +542,16 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive", "This configuration will define the service name for which the ranger authorization" + " policies needs to be replicated"), + REPL_INCLUDE_ATLAS_METADATA("hive.repl.include.atlas.metadata", false, + "Indicates if Atlas metadata should be replicated along with Hive data and metadata or not."), + REPL_ATLAS_ENDPOINT("hive.repl.atlas.endpoint", null, + "Atlas endpoint of the current cluster hive database is getting replicated from/to."), + REPL_ATLAS_REPLICATED_TO_DB("hive.repl.atlas.replicatedto", null, + "Target hive database name Atlas metadata of source hive database is being replicated to."), + REPL_SOURCE_CLUSTER_NAME("hive.repl.source.cluster.name", null, + "Name of the source cluster for the replication."), + REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null, + "Name of the target cluster for the replication."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index eeb81dad25..9e692c5b57 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -43,11 +43,18 @@ import org.junit.Test; import javax.annotation.Nullable; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; +import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1561,4 +1568,142 @@ public void testFailureUnsupportedAuthorizerReplication() throws Throwable { assertEquals("Authorizer sentry not supported for replication ", e.getMessage()); } } + + //Testing just the configs and no impact on existing replication + @Test + public void testAtlasReplication() throws Throwable { + Map confMap = defaultAtlasConfMap(); + primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)") + .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap())); + verifyAtlasMetadataPresent(); + + confMap.remove("hive.repl.atlas.replicatedto"); + replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap)) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"acid_table", "table1"}) + .run("select * from table1") + .verifyResults(new String[] {"1", "2"}); + } + + @Test + public void testAtlasMissingConfigs() throws Throwable { + primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)"); + Map confMap = new HashMap<>(); + confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); + confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas"); + ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, true); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, true); + confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, true); + confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); + primary.dump(primaryDbName, getAtlasClause(confMap)); + verifyAtlasMetadataPresent(); + + confMap.clear(); + confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); + confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas"); + ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, false); + confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, false); + confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); + primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap)); + } + + private void ensureInvalidUrl(List atlasClause, String endpoint, boolean dump) throws Throwable { + try { + if (dump) { + primary.dump(primaryDbName, atlasClause); + } else { + primary.load(replicatedDbName, primaryDbName, atlasClause); + } + } catch (MalformedURLException e) { + return; + } + Assert.fail("Atlas endpoint is invalid and but test didn't fail:" + endpoint); + } + + private void verifyAtlasMetadataPresent() throws IOException { + Path dbReplDir = new Path(primary.repldDir, + Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = FileSystem.get(dbReplDir.toUri(), primary.getConf()); + assertTrue(fs.exists(dbReplDir)); + FileStatus[] dumpRoots = fs.listStatus(dbReplDir); + assert(dumpRoots.length == 1); + Path dumpRoot = dumpRoots[0].getPath(); + assertTrue("Hive dump root doesn't exist", fs.exists(new Path(dumpRoot, ReplUtils.REPL_HIVE_BASE_DIR))); + Path atlasDumpRoot = new Path(dumpRoot, ReplUtils.REPL_ATLAS_BASE_DIR); + assertTrue("Atlas dump root doesn't exist", fs.exists(atlasDumpRoot)); + assertTrue("Atlas export file doesn't exist", + fs.exists(new Path(atlasDumpRoot, ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME))); + assertTrue("Atlas dump metadata doesn't exist", + fs.exists(new Path(atlasDumpRoot, EximUtil.METADATA_NAME))); + BufferedReader br = null; + try { + br = new BufferedReader(new InputStreamReader( + fs.open(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)), Charset.defaultCharset())); + String[] lineContents = br.readLine().split("\t", 5); + assertEquals(primary.hiveConf.get("fs.defaultFS"), lineContents[0]); + assertEquals(0, Long.parseLong(lineContents[1])); + } finally { + if (br != null) { + br.close(); + } + } + } + + private void ensureFailedReplOperation(List clause, String conf, boolean dump) throws Throwable { + try { + if (dump) { + primary.dump(primaryDbName, clause); + } else { + primary.load(replicatedDbName, primaryDbName, clause); + } + Assert.fail(conf + " is mandatory config for Atlas metadata replication but it didn't fail."); + } catch (SemanticException e) { + assertEquals(e.getMessage(), (conf + " is mandatory config for Atlas metadata replication")); + } + } + + private Map defaultAtlasConfMap() { + Map confMap = new HashMap<>(); + confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); + confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName); + confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); + confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); + return confMap; + } + + private List getAtlasClause(Map confMap) { + List confList = new ArrayList(); + for (Map.Entry entry:confMap.entrySet()) { + confList.add(quote(entry.getKey()) + "=" + quote(entry.getValue())); + } + return confList; + } + + private String quote(String str) { + return "'" + str + "'"; + } } diff --git a/pom.xml b/pom.xml index a8513f6f1f..df38d73ddd 100644 --- a/pom.xml +++ b/pom.xml @@ -127,6 +127,7 @@ 1.5.7 0.10.0 + 2.0.0 1.12.0 1.8.2 1.21.0 @@ -138,6 +139,7 @@ 1.7 3.2.2 1.19 + 1.10 1.1 2.6 3.9 diff --git a/ql/pom.xml b/ql/pom.xml index 9bf7b902b9..52fd50bd56 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -37,6 +37,26 @@ + + org.apache.atlas + atlas-client-v2 + ${atlas.client.version} + + + org.apache.atlas + atlas-client-common + ${atlas.client.version} + + + org.apache.atlas + atlas-intg + ${atlas.client.version} + + + commons-configuration + commons-configuration + ${commons-configuration.version} + org.apache.hive hive-vector-code-gen diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 57b028b711..0eb2dd11b2 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -28,7 +28,9 @@ SCHEDULED_QUERY_MAINT(17), ACK(18), RANGER_DUMP(19), - RANGER_LOAD(20); + RANGER_LOAD(20), + ATLAS_REPL_DUMP(21), + ATLAS_REPL_LOAD(22); private final int value; @@ -91,6 +93,10 @@ public static StageType findByValue(int value) { return RANGER_DUMP; case 20: return RANGER_LOAD; + case 21: + return ATLAS_REPL_DUMP; + case 22: + return ATLAS_REPL_LOAD; default: return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index f0e54613fb..fd2715dc9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -27,6 +27,10 @@ import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; +import org.apache.hadoop.hive.ql.exec.repl.AtlasDumpTask; +import org.apache.hadoop.hive.ql.exec.repl.AtlasDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.AtlasLoadTask; +import org.apache.hadoop.hive.ql.exec.repl.AtlasLoadWork; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask; @@ -120,6 +124,8 @@ public TaskTuple(Class workClass, Class> taskClass) { taskvec.add(new TaskTuple(AckWork.class, AckTask.class)); taskvec.add(new TaskTuple(RangerDumpWork.class, RangerDumpTask.class)); taskvec.add(new TaskTuple(RangerLoadWork.class, RangerLoadTask.class)); + taskvec.add(new TaskTuple(AtlasDumpWork.class, AtlasDumpTask.class)); + taskvec.add(new TaskTuple(AtlasLoadWork.class, AtlasLoadTask.class)); taskvec.add(new TaskTuple(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple(ReplTxnWork.class, ReplTxnTask.class)); taskvec.add(new TaskTuple(DirCopyWork.class, DirCopyTask.class)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java new file mode 100644 index 0000000000..45daffb88f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Atlas Metadata Replication Dump Task. + **/ +public class AtlasDumpTask extends Task implements Serializable { + + private static final transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class); + private static final long serialVersionUID = 1L; + private transient AtlasRestClient atlasRestClient; + + @Override + public int execute() { + try { + AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); + LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:", + atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); + atlasRestClient = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint()) + .getClient(atlasReplInfo.getConf()); + AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder(); + String entityGuid = checkHiveEntityGuid(atlasRequestBuilder, atlasReplInfo.getSrcCluster(), + atlasReplInfo.getSrcDB()); + long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid); + dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo); + createDumpMetadata(atlasReplInfo, currentModifiedTime); + return 0; + } catch (Exception e) { + LOG.error("Exception while dumping atlas metadata", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + } + + private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException { + String errorFormat = "%s is mandatory config for Atlas metadata replication"; + //Also validates URL for endpoint. + String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat)) + .toString(); + String tgtDB = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, conf, errorFormat); + String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat); + String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat); + AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), tgtDB, srcCluster, + tgtCluster, work.getStagingDir(), conf); + atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG)); + long lastTimeStamp = work.isBootstrap() ? 0L : lastStoredTimeStamp(); + atlasReplInfo.setTimeStamp(lastTimeStamp); + return atlasReplInfo; + } + + private long lastStoredTimeStamp() throws SemanticException { + Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME); + BufferedReader br = null; + try { + FileSystem fs = prevMetadataPath.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset())); + String[] lineContents = br.readLine().split("\t", 5); + return Long.parseLong(lineContents[1]); + } catch (Exception ex) { + throw new SemanticException(ex); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + } + + private long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException { + AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster()); + long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null) + ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid); + LOG.debug("Current timestamp is: {}", ret); + return ret; + } + + private void dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo) + throws SemanticException { + InputStream inputStream = null; + try { + AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo, + atlasReplInfo.getSrcCluster()); + inputStream = atlasRestClient.exportData(exportRequest); + FileSystem fs = FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf()); + Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); + long numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream); + LOG.info("Wrote to {} ({} bytes)", exportFilePath, numBytesWritten); + } catch (SemanticException ex) { + throw ex; + } catch (Exception ex) { + throw new SemanticException(ex); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + } + + private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, String clusterName, + String srcDb) + throws SemanticException { + AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb); + Set> entries = objectId.getUniqueAttributes().entrySet(); + if (entries == null || entries.isEmpty()) { + throw new SemanticException("Could find entries in objectId for:" + clusterName); + } + Map.Entry item = entries.iterator().next(); + String guid = atlasRestClient.getEntityGuid(objectId.getTypeName(), item.getKey(), (String) item.getValue()); + if (guid == null || guid.isEmpty()) { + throw new SemanticException("Entity not found:" + objectId); + } + return guid; + } + + private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException { + Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME); + List> listValues = new ArrayList<>(); + listValues.add( + Arrays.asList( + atlasReplInfo.getSrcFsUri(), + String.valueOf(lastModifiedTime) + ) + ); + Utils.writeOutput(listValues, dumpFile, conf, true); + LOG.debug("Stored metadata for Atlas dump at:", dumpFile.toString()); + } + + @Override + public StageType getType() { + return StageType.ATLAS_REPL_DUMP; + } + + @Override + public String getName() { + return "ATLAS_REPL_DUMP"; + } + + @Override + public boolean canExecuteInParallel() { + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java new file mode 100644 index 0000000000..d60951872f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.nio.charset.Charset; + +/** + * Atlas metadata replication work. + */ +@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class AtlasDumpWork implements Serializable { + private static final long serialVersionUID = 1L; + private final String srcDB; + private final Path stagingDir; + private final boolean bootstrap; + private final Path prevAtlasDumpDir; + + + public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir) { + this.srcDB = srcDB; + this.stagingDir = stagingDir; + this.bootstrap = bootstrap; + this.prevAtlasDumpDir = prevAtlasDumpDir; + } + + public boolean isBootstrap() { + return bootstrap; + } + + public Path getPrevAtlasDumpDir() { + return prevAtlasDumpDir; + } + + public String getSrcDB() { + return srcDB; + } + + public Path getStagingDir() { + return stagingDir; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java new file mode 100644 index 0000000000..f320d5280c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.Charset; + +/** + * Atlas Metadata Replication Load Task. + **/ +public class AtlasLoadTask extends Task implements Serializable { + private static final long serialVersionUID = 1L; + private static final transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class); + + @Override + public int execute() { + try { + AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); + LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}", + atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); + int importCount = importAtlasMetadata(atlasReplInfo); + LOG.info("Atlas entities import count {}", importCount); + return 0; + } catch (Exception e) { + LOG.error("Exception while loading atlas metadata", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + } + + private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException { + String errorFormat = "%s is mandatory config for Atlas metadata replication"; + //Also validates URL for endpoint. + String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat)) + .toString(); + String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat); + String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat); + AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), work.getTgtDB(), + srcCluster, tgtCluster, work.getStagingDir(), conf); + atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir())); + atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG)); + return atlasReplInfo; + } + + private String getStoredFsUri(Path atlasDumpDir) throws SemanticException { + Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME); + BufferedReader br = null; + try { + FileSystem fs = metadataPath.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset())); + String[] lineContents = br.readLine().split("\t", 5); + return lineContents[0]; + } catch (Exception ex) { + throw new SemanticException(ex); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + } + + private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception { + AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder(); + AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest( + atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), + atlasReplInfo.getSrcCluster(), atlasReplInfo.getTgtCluster(), + atlasReplInfo.getSrcFsUri(), atlasReplInfo.getTgtFsUri()); + AtlasImportResult result = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint()) + .getClient(atlasReplInfo.getConf()).importData(importRequest, atlasReplInfo); + if (result == null || result.getProcessedEntities() == null) { + LOG.info("No Atlas entity found"); + return 0; + } + return result.getProcessedEntities().size(); + } + + @Override + public StageType getType() { + return StageType.ATLAS_REPL_LOAD; + } + + @Override + public String getName() { + return "ATLAS_REPL_LOAD"; + } + + @Override + public boolean canExecuteInParallel() { + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java new file mode 100644 index 0000000000..8f893de1bd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.nio.charset.Charset; + +/** + * Atlas metadata replication load work. + */ +@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED}) +public class AtlasLoadWork implements Serializable { + private static final long serialVersionUID = 1L; + private final String srcDB; + private final String tgtDB; + private final Path stagingDir; + + public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir) { + this.srcDB = srcDB; + this.tgtDB = tgtDB; + this.stagingDir = stagingDir; + } + + public static long getSerialVersionUID() { + return serialVersionUID; + } + + public String getSrcDB() { + return srcDB; + } + + public String getTgtDB() { + return tgtDB; + } + + public Path getStagingDir() { + return stagingDir; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index a7fd0ef2fa..c1fa19d438 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -149,6 +149,10 @@ public int execute() { if (shouldDumpAuthorizationMetadata()) { initiateAuthorizationDumpTask(); } + if (shouldDumpAtlasMetadata()) { + LOG.info("Added task to dump atlas metadata."); + addAtlasDumpTask(isBootstrap, previousValidHiveDumpPath); + } DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); @@ -194,6 +198,10 @@ private boolean shouldDumpAuthorizationMetadata() { return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA); } + private boolean shouldDumpAtlasMetadata() { + return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA); + } + private Path getEncodedDumpRootPath() throws UnsupportedEncodingException { return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() @@ -227,6 +235,17 @@ private void initiateDataCopyTasks() throws SemanticException { } } + private void addAtlasDumpTask(boolean bootstrap, Path prevHiveDumpDir) { + Path atlasDumpDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_ATLAS_BASE_DIR); + Path prevAtlasDumpDir = prevHiveDumpDir == null ? null + : new Path(prevHiveDumpDir.getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); + AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir); + Task atlasDumpTask = TaskFactory.get(atlasDumpWork, conf); + childTasks = new ArrayList<>(); + childTasks.add(atlasDumpTask); + } + + private void finishRemainingTasks() throws SemanticException { Path dumpAckFile = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_HIVE_BASE_DIR + File.separator diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 7a309626cc..792e331884 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -107,6 +107,9 @@ public int execute() { } work.setRootTask(this); this.parentTasks = null; + if (shouldLoadAtlasMetadata()) { + addAtlasLoadTask(); + } if (shouldLoadAuthorizationMetadata()) { initiateAuthorizationLoadTask(); } @@ -141,8 +144,23 @@ private void initiateAuthorizationLoadTask() throws SemanticException { childTasks.add(rangerLoadTask); } else { throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) - + " not supported for replication "); + + " not supported for replication "); + } + } + + private void addAtlasLoadTask() throws HiveException { + Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); + LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir); + AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir); + Task atlasLoadTask = TaskFactory.get(atlasLoadWork, conf); + if (childTasks == null) { + childTasks = new ArrayList<>(); } + childTasks.add(atlasLoadTask); + } + + private boolean shouldLoadAtlasMetadata() { + return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA); } private int executeBootStrapLoad() throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java new file mode 100644 index 0000000000..b0923d7d62 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * Atlas metadata replication info holder. + */ +public class AtlasReplInfo { + private final String srcDB; + private final String tgtDB; + private final String srcCluster; + private final String tgtCluster; + private final Path stagingDir; + private final HiveConf conf; + private final String atlasEndpoint; + private String srcFsUri; + private String tgtFsUri; + private long timeStamp; + + public AtlasReplInfo(String atlasEndpoint, String srcDB, String tgtDB, String srcCluster, + String tgtCluster, Path stagingDir, HiveConf conf) { + this.atlasEndpoint = atlasEndpoint; + this.srcDB = srcDB; + this.tgtDB = tgtDB; + this.srcCluster = srcCluster; + this.tgtCluster = tgtCluster; + this.stagingDir = stagingDir; + this.conf = conf; + } + + public String getSrcDB() { + return srcDB; + } + + public String getTgtDB() { + return tgtDB; + } + + public String getSrcCluster() { + return srcCluster; + } + + public String getTgtCluster() { + return tgtCluster; + } + + public Path getStagingDir() { + return stagingDir; + } + + public HiveConf getConf() { + return conf; + } + + public String getAtlasEndpoint() { + return atlasEndpoint; + } + + public String getSrcFsUri() { + return srcFsUri; + } + + public void setSrcFsUri(String srcFsUri) { + this.srcFsUri = srcFsUri; + } + + public String getTgtFsUri() { + return tgtFsUri; + } + + public void setTgtFsUri(String tgtFsUri) { + this.tgtFsUri = tgtFsUri; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java new file mode 100644 index 0000000000..fbe8466942 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AttributeTransform; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.type.AtlasType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; + +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.CLUSTER_NAME_SEPARATOR; + +/** + * Helper class to create export/import request. + */ +public class AtlasRequestBuilder { + private static final Logger LOG = LoggerFactory.getLogger(AtlasRequestBuilder.class); + public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + static final String ATLAS_TYPE_HIVE_DB = "hive_db"; + static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc"; + static final String QUALIFIED_NAME_FORMAT = "%s@%s"; + + private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName"; + private static final String ATTRIBUTE_NAME_NAME = ".name"; + private static final String ATTRIBUTE_NAME_REPLICATED_TO = "replicatedTo"; + private static final String ATTRIBUTE_NAME_REPLICATED_FROM = "replicatedFrom"; + private static final String ATTRIBUTE_NAME_LOCATION = ".location"; + + private static final String HIVE_DB_CLUSTER_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_CLUSTER_NAME; + private static final String HIVE_DB_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_NAME; + private static final String HIVE_DB_LOCATION = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_LOCATION; + private static final String HIVE_SD_LOCATION = ATLAS_TYPE_HIVE_SD + ATTRIBUTE_NAME_LOCATION; + + private static final String TRANSFORM_ENTITY_SCOPE = "__entity"; + private static final String REPLICATED_TAG_NAME = "%s_replicated"; + + public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, String srcAtlasServer) { + List itemsToExport = getItemsToExport(atlasReplInfo, srcAtlasServer); + Map options = getOptions(atlasReplInfo); + return createRequest(itemsToExport, options); + } + + private List getItemsToExport(AtlasReplInfo atlasReplInfo, String srcAtlasServerName) { + List atlasObjectIds = new ArrayList<>(); + final String qualifiedName = getQualifiedName(srcAtlasServerName, atlasReplInfo.getSrcDB()); + atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); + return atlasObjectIds; + } + + private AtlasExportRequest createRequest(final List itemsToExport, + final Map options) { + AtlasExportRequest request = new AtlasExportRequest() { + { + setItemsToExport(itemsToExport); + setOptions(options); + } + }; + LOG.debug("createRequest: {}" + request); + return request; + } + + private Map getOptions(AtlasReplInfo atlasReplInfo) { + String targetCluster = atlasReplInfo.getTgtCluster(); + Map options = new HashMap<>(); + options.put(AtlasExportRequest.OPTION_FETCH_TYPE, AtlasExportRequest.FETCH_TYPE_INCREMENTAL); + options.put(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, atlasReplInfo.getTimeStamp()); + options.put(AtlasExportRequest.OPTION_SKIP_LINEAGE, true); + if (targetCluster != null && !targetCluster.isEmpty()) { + options.put(AtlasExportRequest.OPTION_KEY_REPLICATED_TO, targetCluster); + } + return options; + } + + public AtlasObjectId getItemToExport(String srcCluster, String srcDB) { + final String qualifiedName = getQualifiedName(srcCluster, srcDB); + return new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName); + } + + private String getQualifiedName(String clusterName, String srcDb) { + String qualifiedName = String.format(QUALIFIED_NAME_FORMAT, srcDb.toLowerCase(), clusterName); + LOG.debug("Atlas getQualifiedName: {}", qualifiedName); + return qualifiedName; + } + + public AtlasImportRequest createImportRequest(String sourceDataSet, String targetDataSet, + String sourceClusterName, String targetClusterName, + String sourceFsEndpoint, String targetFsEndpoint) { + AtlasImportRequest request = new AtlasImportRequest(); + addTransforms(request.getOptions(), + sourceClusterName, targetClusterName, + sourceDataSet, targetDataSet, + sourceFsEndpoint, targetFsEndpoint); + addReplicatedFrom(request.getOptions(), sourceClusterName); + LOG.debug("Atlas metadata import request: {}" + request); + return request; + } + + private void addTransforms(Map options, String srcClusterName, + String tgtClusterName, String sourceDataSet, String targetDataSet, + String sourcefsEndpoint, String targetFsEndpoint) { + List transforms = new ArrayList<>(); + String sanitizedSourceClusterName = sanitizeForClassificationName(srcClusterName); + addClassificationTransform(transforms, + String.format(REPLICATED_TAG_NAME, sanitizedSourceClusterName)); + addClearReplicationAttributesTransform(transforms); + addClusterRenameTransform(transforms, srcClusterName, tgtClusterName); + if (!sourceDataSet.equals(targetDataSet)) { + addDataSetRenameTransform(transforms, sourceDataSet, targetDataSet); + } + addLocationTransform(transforms, sourcefsEndpoint, targetFsEndpoint); + options.put(AtlasImportRequest.TRANSFORMERS_KEY, AtlasType.toJson(transforms)); + } + + private void addLocationTransform(List transforms, String srcFsUri, String tgtFsUri) { + transforms.add(create( + HIVE_DB_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri, + HIVE_DB_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri + ) + ); + transforms.add(create( + HIVE_SD_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri, + HIVE_SD_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri + ) + ); + } + + private void addDataSetRenameTransform(List transforms, + String sourceDataSet, String targetDataSet) { + transforms.add(create( + HIVE_DB_NAME, "EQUALS: " + sourceDataSet, + HIVE_DB_NAME, "SET: " + targetDataSet)); + } + + private void addClusterRenameTransform(List transforms, + String srcClusterName, String tgtClustername) { + transforms.add(create(HIVE_DB_CLUSTER_NAME, "EQUALS: " + srcClusterName, + HIVE_DB_CLUSTER_NAME, "SET: " + tgtClustername)); + } + + private void addReplicatedFrom(Map options, String sourceClusterName) { + options.put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, sourceClusterName); + } + + private void addClassificationTransform(List transforms, String classificationName) { + transforms.add(create("__entity", "topLevel: ", + "__entity", "ADD_CLASSIFICATION: " + classificationName)); + } + + private String sanitizeForClassificationName(String s) { + if (s != null && s.isEmpty()) { + return s; + } + return s.replace('-', '_').replace(' ', '_'); + } + + private void addClearReplicationAttributesTransform(List transforms) { + Map actions = new HashMap<>(); + actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_TO, "CLEAR:"); + actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_FROM, "CLEAR:"); + + transforms.add(new AttributeTransform(null, actions)); + } + + private AttributeTransform create(String conditionLhs, String conditionRhs, + String actionLhs, String actionRhs) { + return new AttributeTransform(Collections.singletonMap(conditionLhs, conditionRhs), + Collections.singletonMap(actionLhs, actionRhs)); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java new file mode 100644 index 0000000000..dd72f83194 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import java.io.InputStream; + +/** + * Atlas RESTClient interface for Atlas' REST APIs. + */ +public interface AtlasRestClient { + + InputStream exportData(AtlasExportRequest request) throws Exception; + + AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception; + + AtlasServer getServer(String endpoint) throws SemanticException; + + String getEntityGuid(final String entityType, final String attributeName, final String qualifiedName) + throws SemanticException; + + boolean getStatus() throws SemanticException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java new file mode 100644 index 0000000000..8bd016c20a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasException; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationConverter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Builder for AtlasRestClient. + */ +public class AtlasRestClientBuilder { + private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientBuilder.class); + private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries"; + private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms"; + private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address"; + private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos"; + private static final String URL_SEPERATOR = ","; + + private UserGroupInformation userGroupInformation; + protected String incomingUrl; + protected String[] baseUrls; + + public AtlasRestClientBuilder(String urls) { + this.incomingUrl = urls; + if (urls.contains(URL_SEPERATOR)) { + this.baseUrls = urls.split(URL_SEPERATOR); + } else { + this.baseUrls = new String[]{urls}; + } + } + + public AtlasRestClient getClient(HiveConf conf) throws SemanticException { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) { + return new NoOpAtlasRestClient(); + } + return create(); + } + + private AtlasRestClient create() throws SemanticException { + if (baseUrls == null || baseUrls.length == 0) { + throw new SemanticException("baseUrls is not set."); + } + setUGInfo(); + initializeAtlasApplicationProperties(); + AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation, + this.userGroupInformation.getShortUserName(), baseUrls); + return new AtlasRestClientImpl(clientV2); + } + + private AtlasRestClientBuilder setUGInfo() throws SemanticException { + try { + this.userGroupInformation = UserGroupInformation.getLoginUser(); + LOG.info("AuthStrategy: Kerberos : urls: {} : userGroupInformation: {}", baseUrls, userGroupInformation); + } catch (Exception e) { + throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e); + } + return this; + } + + private void initializeAtlasApplicationProperties() throws SemanticException { + try { + Properties props = new Properties(); + props.setProperty(ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY, "1"); + props.setProperty(ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, "0"); + props.setProperty(ATLAS_PROPERTY_REST_ADDRESS, incomingUrl); + props.setProperty(ATLAS_PROPERTY_AUTH_KERBEROS, "true"); + ApplicationProperties.set(ConfigurationConverter.getConfiguration(props)); + } catch (AtlasException e) { + throw new SemanticException(e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java new file mode 100644 index 0000000000..c8d738ef11 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND; + +/** + * Implementation of RESTClient, encapsulates Atlas' REST APIs. + */ +public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient { + private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class); + private final AtlasClientV2 clientV2; + + public AtlasRestClientImpl(AtlasClientV2 clientV2) { + this.clientV2 = clientV2; + } + + private T runWithTimeout(Callable callable, long timeout, TimeUnit timeUnit) throws Exception { + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Future future = executor.submit(callable); + executor.shutdown(); + try { + return future.get(timeout, timeUnit); + } catch (TimeoutException e) { + future.cancel(true); + throw e; + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof Exception) { + throw (Exception) t; + } else { + throw new IllegalStateException(t); + } + } + } + + public InputStream exportData(AtlasExportRequest request) throws Exception { + LOG.debug("exportData: {}" + request); + return invokeWithRetry(new Callable() { + @Override + public InputStream call() throws Exception { + return clientV2.exportData(request); + } + }, null); + } + + public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception { + AtlasImportResult defaultResult = getDefaultAtlasImportResult(request); + Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); + FileSystem fs = FileSystem.get(exportFilePath.toUri(), atlasReplInfo.getConf()); + if (!fs.exists(exportFilePath)) { + return defaultResult; + } + LOG.debug("Atlas import data request: {}" + request); + return invokeWithRetry(new Callable() { + @Override + public AtlasImportResult call() throws Exception { + InputStream is = null; + try { + is = fs.open(exportFilePath); + return clientV2.importData(request, is); + } finally { + if (is != null) { + is.close(); + } + } + } + }, defaultResult); + } + + private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest request) { + return new AtlasImportResult(request, "", "", "", 0L); + } + + public AtlasServer getServer(String endpoint) throws SemanticException { + try { + return clientV2.getServer(endpoint); + } catch (AtlasServiceException e) { + int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1; + if (statusCode != NOT_FOUND.getStatusCode()) { + throw new SemanticException("Exception while getServer ", e.getCause()); + } + LOG.warn("getServer of: {} returned: {}", endpoint, e.getMessage()); + } + return null; + } + + public String getEntityGuid(final String entityType, + final String attributeName, final String qualifiedName) throws SemanticException { + int entityApiTimeOut = 10; + final Map attributes = new HashMap() { + { + put(attributeName, qualifiedName); + } + }; + + try { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = runWithTimeout( + new Callable() { + @Override + public AtlasEntity.AtlasEntityWithExtInfo call() throws Exception { + return clientV2.getEntityByAttribute(entityType, attributes); + } + }, entityApiTimeOut, TimeUnit.SECONDS); + + if (entityWithExtInfo == null || entityWithExtInfo.getEntity() == null) { + LOG.warn("Atlas entity cannot be retrieved using: type: {} and {} - {}", + entityType, attributeName, qualifiedName); + return null; + } + return entityWithExtInfo.getEntity().getGuid(); + } catch (AtlasServiceException e) { + int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1; + if (statusCode != NOT_FOUND.getStatusCode()) { + throw new SemanticException("Exception while getEntityGuid ", e.getCause()); + } + LOG.warn("getEntityGuid: Could not retrieve entity guid for: {}-{}-{}", + entityType, attributeName, qualifiedName, e.getMessage()); + return null; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + public boolean getStatus() throws SemanticException { + try { + return clientV2.isServerReady(); + } catch (AtlasServiceException e) { + throw new SemanticException(e.getCause()); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java new file mode 100644 index 0000000000..67df9493bb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AtlasServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.UUID; + +/** + * Dummy implementation of RESTClient, encapsulates Atlas' REST APIs. + * To be used for testing. + */ +public class NoOpAtlasRestClient implements AtlasRestClient { + + public InputStream exportData(AtlasExportRequest request) { + return new ByteArrayInputStream("Dummy".getBytes(Charset.forName("UTF-8"))); + } + + public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) { + return new AtlasImportResult(request, "", "", "", 0L); + } + + public AtlasServer getServer(String endpoint) { + return new AtlasServer(); + } + + public String getEntityGuid(final String entityType, + final String attributeName, final String qualifiedName) { + return UUID.randomUUID().toString(); + } + + public boolean getStatus() { + return true; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java new file mode 100644 index 0000000000..dbc065ab3c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import org.apache.atlas.AtlasServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +/** + * Implement retry logic for service calls. + */ +public class RetryingClient { + private static final Logger LOG = LoggerFactory.getLogger(RetryingClient.class); + private static final int PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT = (30 * 1000); + private static final int RETRY_COUNT_DEFAULT = 5; + private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update"; + private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress"; + private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file"; + private static final int MAX_RETY_COUNT = RETRY_COUNT_DEFAULT; + private static final int PAUSE_DURATION_INCREMENT_IN_MS = PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT; + + protected T invokeWithRetry(Callable func, T defaultReturnValue) throws Exception { + for (int currentRetryCount = 1; currentRetryCount <= MAX_RETY_COUNT; currentRetryCount++) { + try { + LOG.debug("Retrying method: {}", func.getClass().getName(), null); + return func.call(); + } catch (Exception e) { + if (processImportExportLockException(e, currentRetryCount)) { + continue; + } + if (processInvalidParameterException(e)) { + return null; + } + LOG.error(func.getClass().getName(), e); + throw new Exception(e); + } + } + return defaultReturnValue; + } + + private boolean processInvalidParameterException(Exception e) { + if (e instanceof UniformInterfaceException) { + return true; + } + if (!(e instanceof AtlasServiceException)) { + return false; + } + if (e.getMessage() == null) { + return false; + } + return (e.getMessage().contains(ERROR_MESSAGE_NO_ENTITIES) + || e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP)); + } + + private boolean processImportExportLockException(Exception e, int currentRetryCount) throws Exception { + if (!(e instanceof AtlasServiceException)) { + return false; + } + String excMessage = e.getMessage() == null ? "" : e.getMessage(); + if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) { + try { + int pauseDuration = PAUSE_DURATION_INCREMENT_IN_MS * currentRetryCount; + LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", pauseDuration); + Thread.sleep(pauseDuration); + } catch (InterruptedException intEx) { + LOG.error("Pause wait interrupted!", intEx); + throw new Exception(intEx); + } + return true; + } + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 377f742a70..66a2efbfbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc; @@ -82,6 +83,19 @@ // Root base directory name for ranger. public static final String REPL_RANGER_BASE_DIR = "ranger"; + // Root base directory name for atlas. + public static final String REPL_ATLAS_BASE_DIR = "atlas"; + + // Atlas meta data export file. + public static final String REPL_ATLAS_EXPORT_FILE_NAME = "atlas_export.zip"; + + // Config for hadoop default file system. + public static final String DEFAULT_FS_CONFIG = "fs.defaultFS"; + + // Cluster name separator, used when the cluster name contains data center name as well, e.g. dc$mycluster1. + public static final String CLUSTER_NAME_SEPARATOR = "$"; + + // Name of the directory which stores the list of tables included in the policy in case of table level replication. // One file per database, named after the db name. The directory is not created for db level replication. public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; @@ -178,6 +192,14 @@ public static boolean replCkptStatus(String dbName, Map props, S return false; } + public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat) throws SemanticException { + String val = hiveConf.get(configParam); + if (StringUtils.isEmpty(val)) { + throw new SemanticException(String.format(errorMsgFormat, configParam)); + } + return val; + } + public static boolean isTableMigratingToTransactional(HiveConf conf, org.apache.hadoop.hive.metastore.api.Table tableObj) throws TException, IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 44320a5932..154f02809e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; @@ -46,6 +47,7 @@ import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -57,6 +59,7 @@ public class Utils { private static Logger LOG = LoggerFactory.getLogger(Utils.class); public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = "bootstrap.dump.state."; + private static final int DEF_BUF_SIZE = 8 * 1024; public enum ReplDumpState { IDLE, ACTIVE @@ -97,6 +100,34 @@ public Void execute() throws IOException { } } + public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is) throws SemanticException { + Retry retriable = new Retry(IOException.class) { + @Override + public Long execute() throws IOException { + FSDataOutputStream fos = null; + try { + long bytesWritten; + fos = fs.create(exportFilePath); + byte[] buffer = new byte[DEF_BUF_SIZE]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + fos.write(buffer, 0, bytesRead); + } + bytesWritten = fos.getPos(); + return bytesWritten; + } finally { + if (fos != null) { + fos.close(); + } + } + }}; + try { + return retriable.run(); + } catch (Exception e) { + throw new SemanticException(e); + } + } + public static void writeOutput(String content, Path outputFile, HiveConf hiveConf) throws SemanticException { Retry retriable = new Retry(IOException.class) {