diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index d2d101ee07..2c3a77851a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -138,7 +138,7 @@ private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedU String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat); String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat); AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), tgtDB, srcCluster, - tgtCluster, work.getStagingDir(), conf); + tgtCluster, work.getStagingDir(), work.getTableListPath(), conf); atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG)); long lastTimeStamp = work.isBootstrap() ? 0L : lastStoredTimeStamp(); atlasReplInfo.setTimeStamp(lastTimeStamp); @@ -195,8 +195,7 @@ long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo at InputStream inputStream = null; long numBytesWritten = 0L; try { - AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo, - atlasReplInfo.getSrcCluster()); + AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo); inputStream = atlasRestClient.exportData(exportRequest); if (inputStream == null) { LOG.info("There is no Atlas metadata to be exported"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java index 3f10730be4..a71d3e5d71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java @@ -36,14 +36,16 @@ private final boolean bootstrap; private final Path prevAtlasDumpDir; private final transient ReplicationMetricCollector metricCollector; + private final Path tableListPath; - public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, + public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir, Path tableListPath, ReplicationMetricCollector metricCollector) { this.srcDB = srcDB; this.stagingDir = stagingDir; this.bootstrap = bootstrap; this.prevAtlasDumpDir = prevAtlasDumpDir; + this.tableListPath = tableListPath; this.metricCollector = metricCollector; } @@ -63,6 +65,10 @@ public Path getStagingDir() { return stagingDir; } + public Path getTableListPath() { + return tableListPath; + } + public ReplicationMetricCollector getMetricCollector() { return metricCollector; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java index a44aa435aa..a51010e0c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -121,7 +121,7 @@ AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLExcept String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat); String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat); AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), work.getTgtDB(), - srcCluster, tgtCluster, work.getStagingDir(), conf); + srcCluster, tgtCluster, work.getStagingDir(), null, conf); atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir())); atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG)); return atlasReplInfo; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index d5e192ecf1..f5e03ed6e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -277,8 +277,13 @@ private void addAtlasDumpTask(boolean bootstrap, Path prevHiveDumpDir) { Path atlasDumpDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_ATLAS_BASE_DIR); Path prevAtlasDumpDir = prevHiveDumpDir == null ? null : new Path(prevHiveDumpDir.getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); + Path tableListLoc = null; + if (!work.replScope.includeAllTables()) { + Path tableListDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_HIVE_BASE_DIR + "/" + ReplUtils.REPL_TABLE_LIST_DIR_NAME); + tableListLoc = new Path(tableListDir, work.dbNameOrPattern.toLowerCase()); + } AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir, - work.getMetricCollector()); + tableListLoc, work.getMetricCollector()); Task atlasDumpTask = TaskFactory.get(atlasDumpWork, conf); childTasks = new ArrayList<>(); childTasks.add(atlasDumpTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java index b0923d7d62..2505a58e21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java @@ -35,9 +35,10 @@ private String srcFsUri; private String tgtFsUri; private long timeStamp; + private Path tableListFile; public AtlasReplInfo(String atlasEndpoint, String srcDB, String tgtDB, String srcCluster, - String tgtCluster, Path stagingDir, HiveConf conf) { + String tgtCluster, Path stagingDir, Path tableListFile, HiveConf conf) { this.atlasEndpoint = atlasEndpoint; this.srcDB = srcDB; this.tgtDB = tgtDB; @@ -45,6 +46,7 @@ public AtlasReplInfo(String atlasEndpoint, String srcDB, String tgtDB, String sr this.tgtCluster = tgtCluster; this.stagingDir = stagingDir; this.conf = conf; + this.tableListFile = tableListFile; } public String getSrcDB() { @@ -98,4 +100,16 @@ public long getTimeStamp() { public void setTimeStamp(long timeStamp) { this.timeStamp = timeStamp; } + + public Path getTableListFile() { + return tableListFile; + } + + public void setTableListFile(Path tableListFile) { + this.tableListFile = tableListFile; + } + + public boolean isTableLevelRepl() { + return this.tableListFile != null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java index 3c72f8fcf5..a0af6e3200 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java @@ -23,10 +23,20 @@ import org.apache.atlas.model.impexp.AttributeTransform; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; import java.util.Collections; import java.util.List; import java.util.Map; @@ -40,8 +50,10 @@ private static final Logger LOG = LoggerFactory.getLogger(AtlasRequestBuilder.class); public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; static final String ATLAS_TYPE_HIVE_DB = "hive_db"; + public static final String ATLAS_TYPE_HIVE_TABLE = "hive_table"; static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc"; static final String QUALIFIED_NAME_FORMAT = "%s@%s"; + static final String QUALIFIED_NAME_HIVE_TABLE_FORMAT = "%s.%s"; private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName"; private static final String ATTRIBUTE_NAME_NAME = ".name"; @@ -57,16 +69,25 @@ private static final String TRANSFORM_ENTITY_SCOPE = "__entity"; private static final String REPLICATED_TAG_NAME = "%s_replicated"; - public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, String srcAtlasServer) { - List itemsToExport = getItemsToExport(atlasReplInfo, srcAtlasServer); + public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo) + throws SemanticException { + List itemsToExport = getItemsToExport(atlasReplInfo); Map options = getOptions(atlasReplInfo); return createRequest(itemsToExport, options); } - private List getItemsToExport(AtlasReplInfo atlasReplInfo, String srcAtlasServerName) { + private List getItemsToExport(AtlasReplInfo atlasReplInfo) throws SemanticException { List atlasObjectIds = new ArrayList<>(); - final String qualifiedName = getQualifiedName(srcAtlasServerName, atlasReplInfo.getSrcDB()); - atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); + if (atlasReplInfo.isTableLevelRepl()) { + final List tableQualifiedNames = getQualifiedNames(atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB(), + atlasReplInfo.getTableListFile(), atlasReplInfo.getConf()); + for (String tableQualifiedName : tableQualifiedNames) { + atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName)); + } + } else { + final String qualifiedName = getQualifiedName(atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB()); + atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); + } return atlasObjectIds; } @@ -105,6 +126,50 @@ private String getQualifiedName(String clusterName, String srcDb) { return qualifiedName; } + private String getQualifiedName(String clusterName, String srcDB, String tableName) { + String qualifiedTableName = String.format(QUALIFIED_NAME_HIVE_TABLE_FORMAT, srcDB, tableName); + return getQualifiedName(clusterName, qualifiedTableName); + } + + private List getQualifiedNames(String clusterName, String srcDb, Path listOfTablesFile, HiveConf conf) + throws SemanticException { + List qualifiedNames = new ArrayList<>(); + List tableNames = getFileAsList(listOfTablesFile, conf); + if (CollectionUtils.isEmpty(tableNames)) { + LOG.info("Empty file encountered: {}", listOfTablesFile); + return qualifiedNames; + } + for (String tableName : tableNames) { + qualifiedNames.add(getQualifiedName(clusterName, srcDb, tableName)); + } + return qualifiedNames; + } + + private static List getFileAsList(Path listOfTablesFile, HiveConf conf) throws SemanticException { + List list = new ArrayList<>(); + InputStream is = null; + try { + FileSystem fs = FileSystem.get(listOfTablesFile.toUri(), conf); + FileStatus fileStatus = fs.getFileStatus(listOfTablesFile); + if (fileStatus == null) { + throw new SemanticException("Table list file not found: " + listOfTablesFile); + } + is = fs.open(listOfTablesFile); + list.addAll(IOUtils.readLines(is, Charset.defaultCharset())); + } catch (IOException e) { + throw new SemanticException(e); + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + return list; + } + public AtlasImportRequest createImportRequest(String sourceDataSet, String targetDataSet, String sourceClusterName, String targetClusterName, String sourceFsEndpoint, String targetFsEndpoint) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java index 99a51ed0be..59bdc35322 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java @@ -24,7 +24,11 @@ import org.apache.atlas.AtlasServiceException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.commons.configuration.ConfigurationConverter; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; @@ -56,7 +60,11 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URI; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -68,7 +76,9 @@ * Unit test class for testing Atlas metadata Dump. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({LoggerFactory.class, UserGroupInformation.class, ConfigurationConverter.class}) +@PrepareForTest({LoggerFactory.class, UserGroupInformation.class, ConfigurationConverter.class, + FileSystem.class, IOUtils.class}) + public class TestAtlasDumpTask { @Mock @@ -242,6 +252,30 @@ public void testAtlasClientTimeouts() throws Exception { AtlasRestClientBuilder.ATLAS_PROPERTY_READ_TIMEOUT_IN_MS)); } + @Test + public void testCreateExportRequest() throws Exception { + mockStatic(FileSystem.class); + FileSystem fs = mock(FileSystem.class); + FileStatus fileStatus = mock(FileStatus.class); + when(FileSystem.get(Mockito.any(URI.class), Mockito.any(HiveConf.class))).thenReturn(fs); + when(fs.getFileStatus(Mockito.any(Path.class))).thenReturn(fileStatus); + mockStatic(IOUtils.class); + List listOfTable = Arrays.asList(new String [] {"t1", "t2"}); + when(IOUtils.readLines(Mockito.any(InputStream.class), Mockito.any(Charset.class))).thenReturn(listOfTable); + AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder(); + AtlasReplInfo atlasReplInfo = new AtlasReplInfo("http://localhost:31000", "srcDb", "tgtDb", + "src","tgt", new Path("/tmp/staging"), new Path("/tmp/list"), conf); + AtlasExportRequest atlasExportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo); + List itemsToExport = atlasExportRequest.getItemsToExport(); + Assert.assertEquals(2, itemsToExport.size()); + Assert.assertEquals(AtlasRequestBuilder.ATLAS_TYPE_HIVE_TABLE, itemsToExport.get(0).getTypeName()); + Assert.assertEquals("srcdb.t1@src", itemsToExport.get(0).getUniqueAttributes().get( + AtlasRequestBuilder.ATTRIBUTE_QUALIFIED_NAME)); + Assert.assertEquals(AtlasRequestBuilder.ATLAS_TYPE_HIVE_TABLE, itemsToExport.get(1).getTypeName()); + Assert.assertEquals("srcdb.t2@src", itemsToExport.get(1).getUniqueAttributes().get( + AtlasRequestBuilder.ATTRIBUTE_QUALIFIED_NAME)); + } + private void setupConfForRetry() { when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(60L); when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(10L); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java index 826935b187..0024fa5e26 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasLoadTask.java @@ -58,7 +58,7 @@ public void testAtlasLoadMetrics() throws Exception { Logger logger = Mockito.mock(Logger.class); Whitebox.setInternalState(ReplState.class, logger); AtlasReplInfo atlasReplInfo = new AtlasReplInfo("http://localhost:21000/atlas", "srcDB", - "tgtDB", "srcCluster", "tgtCluster", new Path("hdfs://tmp"), conf); + "tgtDB", "srcCluster", "tgtCluster", new Path("hdfs://tmp"), null, conf); atlasReplInfo.setSrcFsUri("hdfs://srcFsUri:8020"); atlasReplInfo.setTgtFsUri("hdfs:tgtFsUri:8020"); Mockito.doReturn(atlasReplInfo).when(atlasLoadTaskSpy).createAtlasReplInfo();