diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index be83489cb3..0115d079bf 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -462,6 +462,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMPDIR_TTL("hive.repl.dumpdir.ttl", "7d", new TimeValidator(TimeUnit.DAYS), "TTL of dump dirs before cleanup."), + REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, + "Indicates whether replication dump only metadata information or data + metadata."), + REPL_DUMP_INCLUDE_ACID_TABLES("hive.repl.dump.include.acid.tables", false, + "Indicates if repl dump should include information about ACID tables. This has to be used \n" + + "in conjunction with 'hive.repl.dump.metadata.only' to enable copying of metadata for \n" + + "acid tables which do not require the corresponding transaction semantics to be applied on target. \n" + + "This can be removed when ACID table replication is supported."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index aa2c3bb460..4ac17cdf62 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -41,6 +41,7 @@ Licensed to the Apache Software Foundation (ASF) under one import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -59,13 +60,14 @@ Licensed to the Apache Software Foundation (ASF) under one protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private static WarehouseInstance primary, replica; + private static MiniDFSCluster miniDFSCluster; @BeforeClass public static void classLevelSetup() throws Exception { Configuration conf = new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - MiniDFSCluster miniDFSCluster = + miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); primary = new WarehouseInstance(LOG, miniDFSCluster); replica = new WarehouseInstance(LOG, miniDFSCluster); @@ -279,4 +281,99 @@ public void parallelExecutionOfReplicationBootStrapLoad() throws Throwable { .verifyResults(Arrays.asList("india", "australia", "russia", "uk", "us", "france", "japan", "china")); } + + @Test + public void testMetadataBootstrapDump() throws Throwable { + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i int, j int)") + .run("insert into table1 values (1,2)") + .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", + "'hive.repl.dump.include.acid.tables'='true'")); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "acid_table", "table1" }) + .run("select * from table1") + .verifyResults(Collections.emptyList()); + } + + @Test + public void testIncrementalMetadataReplication() throws Throwable { + //////////// Bootstrap //////////// + WarehouseInstance.Tuple bootstrapTuple = primary + .run("use " + primaryDbName) + .run("create table table1 (i int, j int)") + .run("create table table2 (a int, city string) partitioned by (country string)") + .run("create table table3 (i int, j int)") + .run("insert into table1 values (1,2)") + .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", + "'hive.repl.dump.include.acid.tables'='true'")); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "table1", "table2", "table3" }) + .run("select * from table1") + .verifyResults(Collections.emptyList()); + + //////////// First Incremental //////////// + WarehouseInstance.Tuple incrementalOneTuple = + primary + .run("use " + primaryDbName) + .run("alter table table1 rename to renamed_table1") + .run("insert into table2 partition(country='india') values (1,'mumbai') ") + .run("create table table4 (i int, j int)") + .dump( + "repl dump " + primaryDbName + " from " + bootstrapTuple.lastReplicationId + " to " + + Long.parseLong(bootstrapTuple.lastReplicationId) + 100L + " limit 100 " + + "with ('hive.repl.dump.metadata.only'='true')" + ); + + replica.load(replicatedDbName, incrementalOneTuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "renamed_table1", "table2", "table3", "table4" }) + .run("select * from renamed_table1") + .verifyResults(Collections.emptyList()) + .run("select * from table2") + .verifyResults(Collections.emptyList()); + + //////////// Second Incremental //////////// + WarehouseInstance.Tuple secondIncremental = primary + .run("alter table table2 add columns (zipcode int)") + .run("alter table table3 change i a string") + .run("alter table table3 set tblproperties('custom.property'='custom.value')") + .run("drop table renamed_table1") + .dump("repl dump " + primaryDbName + " from " + incrementalOneTuple.lastReplicationId + + " with ('hive.repl.dump.metadata.only'='true')" + ); + + replica.load(replicatedDbName, secondIncremental.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "table2", "table3", "table4" }) + .run("desc table3") + .verifyResults(new String[] { + "a \tstring \t ", + "j \tint \t " + }) + .run("desc table2") + .verifyResults(new String[] { + "a \tint \t ", + "city \tstring \t ", + "country \tstring \t ", + "zipcode \tint \t ", + "\t \t ", + "# Partition Information\t \t ", + "# col_name \tdata_type \tcomment ", + "country \tstring \t ", + }) + .run("show tblproperties table3('custom.property')") + .verifyResults(new String[] { + "custom.value\t " + }); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 061817fc7f..aa08199152 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -47,6 +47,7 @@ Licensed to the Apache Software Foundation (ASF) under one import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -178,16 +179,28 @@ WarehouseInstance run(String command) throws Throwable { return this; } - Tuple dump(String dbName, String lastReplicationId) throws Throwable { - advanceDumpDir(); + Tuple dump(String dbName, String lastReplicationId, List withClauseOptions) + throws Throwable { String dumpCommand = "REPL DUMP " + dbName + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); + if (!withClauseOptions.isEmpty()) { + dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return dump(dumpCommand); + } + + Tuple dump(String dumpCommand) throws Throwable { + advanceDumpDir(); run(dumpCommand); String dumpLocation = row0Result(0, false); String lastDumpId = row0Result(1, true); return new Tuple(dumpLocation, lastDumpId); } + Tuple dump(String dbName, String lastReplicationId) throws Throwable { + return dump(dbName, lastReplicationId, Collections.emptyList()); + } + WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Throwable { run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); printOutput(); @@ -211,10 +224,15 @@ WarehouseInstance verifyResults(String[] data) throws IOException { List results = getOutput(); logger.info("Expecting {}", StringUtils.join(data, ",")); logger.info("Got {}", results); - assertEquals(data.length, results.size()); - for (int i = 0; i < data.length; i++) { - assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); - } + List filteredResults = results.stream().filter( + x -> !x.toLowerCase() + .contains(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())) + .map(String::toLowerCase) + .collect(Collectors.toList()); + List lowerCaseData = + Arrays.stream(data).map(String::toLowerCase).collect(Collectors.toList()); + assertEquals(data.length, filteredResults.size()); + assertTrue(filteredResults.containsAll(lowerCaseData)); return this; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index eade36f3cd..639f319d7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -177,8 +177,10 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex replLogger.eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString()); } - private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { - ReplicationSpec rspec = getNewReplicationSpec(eventId.toString(), eventId.toString()); + private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) { + ReplicationSpec rspec = + getNewReplicationSpec(eventId.toString(), eventId.toString(), conf.getBoolean( + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, false)); rspec.setReplSpecType(ReplicationSpec.Type.INCREMENTAL_DUMP); return rspec; } @@ -269,8 +271,9 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except } } - private ReplicationSpec getNewReplicationSpec(String evState, String objState) { - return new ReplicationSpec(true, false, evState, objState, false, true, true); + private ReplicationSpec getNewReplicationSpec(String evState, String objState, + boolean isMetadataOnly) { + return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true, true); } private String getNextDumpDir() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 4ab7312b7e..b3ac75b5f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer; @@ -261,16 +262,15 @@ public static void createDbExportDump(FileSystem fs, Path metadataPath, Database } } - public static void createExportDump(FileSystem fs, Path metadataPath, - org.apache.hadoop.hive.ql.metadata.Table tableHandle, - Iterable partitions, - ReplicationSpec replicationSpec) throws SemanticException, IOException { + public static void createExportDump(FileSystem fs, Path metadataPath, Table tableHandle, + Iterable partitions, ReplicationSpec replicationSpec, HiveConf hiveConf) + throws SemanticException, IOException { - if (replicationSpec == null){ + if (replicationSpec == null) { replicationSpec = new ReplicationSpec(); // instantiate default values if not specified } - if (tableHandle == null){ + if (tableHandle == null) { replicationSpec.setNoop(true); } @@ -278,7 +278,7 @@ public static void createExportDump(FileSystem fs, Path metadataPath, if (replicationSpec.isInReplicationScope()) { new ReplicationSpecSerializer().writeTo(writer, replicationSpec); } - new TableSerializer(tableHandle, partitions).writeTo(writer, replicationSpec); + new TableSerializer(tableHandle, partitions, hiveConf).writeTo(writer, replicationSpec); } } @@ -408,24 +408,21 @@ public boolean accept(Path p) { /** * Verify if a table should be exported or not */ - public static Boolean shouldExportTable(ReplicationSpec replicationSpec, Table tableHandle) throws SemanticException { - if (replicationSpec == null) - { + private static Boolean shouldExportTable(ReplicationSpec replicationSpec, Table tableHandle) throws SemanticException { + if (replicationSpec == null) { replicationSpec = new ReplicationSpec(); } - if (replicationSpec.isNoop()) - { + if (replicationSpec.isNoop()) { return false; } - if (tableHandle == null) - { + if (tableHandle == null) { return false; } if (replicationSpec.isInReplicationScope()) { - return !(tableHandle == null || tableHandle.isTemporary() || tableHandle.isNonNative() || + return !(tableHandle.isTemporary() || tableHandle.isNonNative() || (tableHandle.getParameters() != null && StringUtils.equals(tableHandle.getParameters().get("transactional"), "true"))); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 209f75de9e..3360c0e53d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -879,7 +879,8 @@ replDumpStatement (KW_TO (rangeEnd=Number))? (KW_LIMIT (batchSize=Number))? )? - -> ^(TOK_REPL_DUMP $dbName $tblName? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? ) + (KW_WITH replConf=replConfigs)? + -> ^(TOK_REPL_DUMP $dbName ^(TOK_TABNAME $tblName)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) ; replLoadStatement diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java index 7a28b433af..197f2194f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java @@ -21,6 +21,7 @@ import java.text.SimpleDateFormat; import java.util.Date; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.slf4j.Logger; @@ -86,7 +87,8 @@ private void export_meta_data(PreDropTableEvent tableEvent) throws MetaException Path outFile = new Path(metaPath, name + EximUtil.METADATA_NAME); try { SessionState.getConsole().printInfo("Beginning metadata export"); - EximUtil.createExportDump(fs, outFile, mTbl, null, null); + EximUtil.createExportDump(fs, outFile, mTbl, null, null, + new HiveConf(conf, MetaDataExportListener.class)); if (moveMetadataToTrash == true) { wh.deleteDir(metaPath, true); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 80556aec88..9c53b3c212 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -128,9 +128,18 @@ private void initReplDump(ASTNode ast) { // skip the first node, which is always required int currNode = 1; while (currNode < numChildren) { - if (ast.getChild(currNode).getType() != TOK_FROM) { + if (ast.getChild(currNode).getType() == TOK_REPL_CONFIG) { + Map replConfigs + = DDLSemanticAnalyzer.getProps((ASTNode) ast.getChild(currNode).getChild(0)); + if (null != replConfigs) { + for (Map.Entry config : replConfigs.entrySet()) { + conf.set(config.getKey(), config.getValue()); + } + } + } + else if (ast.getChild(currNode).getType() == TOK_TABNAME) { // optional tblName was specified. - tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getText()); + tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getChild(0).getText()); } else { // TOK_FROM subtree Tree fromNode = ast.getChild(currNode); @@ -152,8 +161,6 @@ private void initReplDump(ASTNode ast) { // move to the next child in FROM tree numChild++; } - // FROM node is always the last - break; } // move to the next root node currNode++; @@ -176,7 +183,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { ErrorMsg.INVALID_PATH.getMsg(ast), maxEventLimit, ctx.getResFile().toUri().toString() - ), conf); + ), conf, true); rootTasks.add(replDumpWorkTask); if (dbNameOrPattern != null) { for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index a44f98f444..6b56362462 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -26,6 +26,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -34,7 +35,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.AuthEntities; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,16 +60,16 @@ Licensed to the Apache Software Foundation (ASF) under one private final HiveConf conf; private final Paths paths; - public TableExport(Paths paths, TableSpec tableSpec, - ReplicationSpec replicationSpec, Hive db, String distCpDoAsUser, HiveConf conf) - throws SemanticException { + public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, + String distCpDoAsUser, HiveConf conf) { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) ? null : tableSpec; this.replicationSpec = replicationSpec; - if (this.tableSpec != null && this.tableSpec.tableHandle.isView()) { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || (this.tableSpec != null + && this.tableSpec.tableHandle.isView())) { this.replicationSpec.setIsMetadataOnly(true); } this.db = db; @@ -82,10 +82,6 @@ public void write() throws SemanticException { if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { - //first we should get the correct replication spec before doing metadata/data export - if (tableSpec.tableHandle.isView()) { - replicationSpec.setIsMetadataOnly(true); - } PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly()) { @@ -130,7 +126,8 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio paths.metaDataExportFile(), tableSpec == null ? null : tableSpec.tableHandle, partitions, - replicationSpec); + replicationSpec, + conf); logger.debug("_metadata file written into " + paths.metaDataExportFile().toString()); } catch (Exception e) { // the path used above should not be used on a second try as each dump request is written to a unique location. @@ -159,8 +156,12 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } } - private boolean shouldExport() throws SemanticException { - return EximUtil.shouldExportTable(replicationSpec, tableSpec.tableHandle); + private boolean shouldExport() { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_INCLUDE_ACID_TABLES) + && AcidUtils.isAcidTable(tableSpec.tableHandle)) { + return true; + } + return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, conf); } /** @@ -172,7 +173,7 @@ private boolean shouldExport() throws SemanticException { private final HiveConf conf; private final Path exportRootDir; private final FileSystem exportFileSystem; - private boolean writeData = true; + private boolean writeData; public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, HiveConf conf, boolean shouldWriteData) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 97f0e0befe..abcb7e3af9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -22,8 +22,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.io.IOUtils; @@ -152,4 +155,28 @@ public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) thro } return false; } + + /** + * validates if a table can be exported, similar to EximUtil.shouldExport with few replication + * specific checks. + */ + public static Boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, + HiveConf hiveConf) { + if (replicationSpec == null) { + replicationSpec = new ReplicationSpec(); + } + + if (replicationSpec.isNoop() || tableHandle == null) { + return false; + } + + if (replicationSpec.isInReplicationScope()) { + boolean isAcidTable = AcidUtils.isAcidTable(tableHandle); + if (isAcidTable) { + return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_INCLUDE_ACID_TABLES); + } + return !(tableHandle.isTemporary() || tableHandle.isNonNative()); + } + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index def83842ee..5d6abde8f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -28,14 +26,15 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import javax.annotation.Nullable; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Iterator; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; class AddPartitionHandler extends AbstractEventHandler { protected AddPartitionHandler(NotificationEvent notificationEvent) { @@ -54,7 +53,7 @@ public void handle(Context withinContext) throws Exception { } final Table qlMdTable = new Table(tobj); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; } @@ -64,23 +63,16 @@ public void handle(Context withinContext) throws Exception { return; } - Iterable qlPtns = Iterables.transform( - ptns, - new Function() { - @Nullable - @Override - public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) { - if (input == null) { - return null; - } - try { - return new Partition(qlMdTable, input); - } catch (HiveException e) { - throw new IllegalArgumentException(e); - } - } - } - ); + Iterable qlPtns = StreamSupport.stream(ptns.spliterator(), false).map(input -> { + if (input == null) { + return null; + } + try { + return new Partition(qlMdTable, input); + } catch (HiveException e) { + throw new IllegalArgumentException(e); + } + }).collect(Collectors.toList()); Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); EximUtil.createExportDump( @@ -88,7 +80,8 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition metaDataPath, qlMdTable, qlPtns, - withinContext.replicationSpec); + withinContext.replicationSpec, + withinContext.hiveConf); Iterator partitionFilesIter = apm.getPartitionFilesIter().iterator(); for (Partition qlPtn : qlPtns) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index 58df6650fd..cde4eed986 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; class AlterPartitionHandler extends AbstractEventHandler { @@ -88,7 +89,7 @@ public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage()); Table qlMdTable = new Table(tableObject); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; } @@ -102,7 +103,8 @@ public void handle(Context withinContext) throws Exception { metaDataPath, qlMdTable, partitions, - withinContext.replicationSpec); + withinContext.replicationSpec, + withinContext.hiveConf); } DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(event.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 4e3ce0e8cb..5f582b32d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; class AlterTableHandler extends AbstractEventHandler { @@ -80,7 +81,8 @@ public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); Table qlMdTableBefore = new Table(before); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTableBefore)) { + if (!Utils + .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, withinContext.hiveConf)) { return; } @@ -89,11 +91,12 @@ public void handle(Context withinContext) throws Exception { Table qlMdTableAfter = new Table(after); Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTableAfter, - null, - withinContext.replicationSpec); + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTableAfter, + null, + withinContext.replicationSpec, + withinContext.hiveConf); } DumpMetaData dmd = withinContext.createDmd(this); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index ef6f340013..3804396626 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import java.io.BufferedWriter; import java.io.IOException; @@ -48,7 +49,7 @@ public void handle(Context withinContext) throws Exception { Table qlMdTable = new Table(tobj); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; } @@ -62,7 +63,8 @@ public void handle(Context withinContext) throws Exception { metaDataPath, qlMdTable, null, - withinContext.replicationSpec); + withinContext.replicationSpec, + withinContext.hiveConf); Path dataPath = new Path(withinContext.eventRoot, "data"); Iterable files = ctm.getFiles(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index df852a3dce..5ac3af0c30 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -19,11 +19,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import java.io.BufferedWriter; @@ -41,10 +43,13 @@ @Override public void handle(Context withinContext) throws Exception { + if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + return; + } InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; } @@ -58,7 +63,8 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec.setIsReplace(insertMsg.isReplace()); EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, qlMdTable, qlPtns, - withinContext.replicationSpec); + withinContext.replicationSpec, + withinContext.hiveConf); Iterable files = insertMsg.getFiles(); if (files != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index c3a70cc8f9..143808bb85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TJSONProtocol; @@ -35,17 +36,19 @@ public static final String FIELD_NAME = "table"; private final org.apache.hadoop.hive.ql.metadata.Table tableHandle; private final Iterable partitions; + private final HiveConf hiveConf; public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle, - Iterable partitions) { + Iterable partitions, HiveConf hiveConf) { this.tableHandle = tableHandle; this.partitions = partitions; + this.hiveConf = hiveConf; } @Override public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { - if (!EximUtil.shouldExportTable(additionalPropertiesProvider, tableHandle)) { + if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, hiveConf)) { return; } @@ -62,8 +65,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi } } - private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider) - throws SemanticException, IOException { + private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider) { if (additionalPropertiesProvider.isInReplicationScope()) { // Current replication state must be set on the Table object only for bootstrap dump. // Event replication State will be null in case of bootstrap dump.