diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index be83489cb3..bffcfc3d47 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -462,6 +462,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPL_DUMPDIR_TTL("hive.repl.dumpdir.ttl", "7d", new TimeValidator(TimeUnit.DAYS), "TTL of dump dirs before cleanup."), + REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false, + "Indicates whether replication dump only metadata information or data + metadata."), + REPL_DUMP_INCLUDE_ACID_TABLES("hive.repl.dump.include.acid.tables", false, + "Indicates if repl dump should include information about ACID tables. It should be \n" + + "used in conjunction with 'hive.repl.dump.metadata.only' to enable copying of \n" + + "metadata for acid tables which do not require the corresponding transaction \n" + + "semantics to be applied on target. This can be removed when ACID table \n" + + "replication is supported."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index aa2c3bb460..4ac17cdf62 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -41,6 +41,7 @@ Licensed to the Apache Software Foundation (ASF) under one import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -59,13 +60,14 @@ Licensed to the Apache Software Foundation (ASF) under one protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private static WarehouseInstance primary, replica; + private static MiniDFSCluster miniDFSCluster; @BeforeClass public static void classLevelSetup() throws Exception { Configuration conf = new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - MiniDFSCluster miniDFSCluster = + miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); primary = new WarehouseInstance(LOG, miniDFSCluster); replica = new WarehouseInstance(LOG, miniDFSCluster); @@ -279,4 +281,99 @@ public void parallelExecutionOfReplicationBootStrapLoad() throws Throwable { .verifyResults(Arrays.asList("india", "australia", "russia", "uk", "us", "france", "japan", "china")); } + + @Test + public void testMetadataBootstrapDump() throws Throwable { + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i int, j int)") + .run("insert into table1 values (1,2)") + .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", + "'hive.repl.dump.include.acid.tables'='true'")); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "acid_table", "table1" }) + .run("select * from table1") + .verifyResults(Collections.emptyList()); + } + + @Test + public void testIncrementalMetadataReplication() throws Throwable { + //////////// Bootstrap //////////// + WarehouseInstance.Tuple bootstrapTuple = primary + .run("use " + primaryDbName) + .run("create table table1 (i int, j int)") + .run("create table table2 (a int, city string) partitioned by (country string)") + .run("create table table3 (i int, j int)") + .run("insert into table1 values (1,2)") + .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", + "'hive.repl.dump.include.acid.tables'='true'")); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "table1", "table2", "table3" }) + .run("select * from table1") + .verifyResults(Collections.emptyList()); + + //////////// First Incremental //////////// + WarehouseInstance.Tuple incrementalOneTuple = + primary + .run("use " + primaryDbName) + .run("alter table table1 rename to renamed_table1") + .run("insert into table2 partition(country='india') values (1,'mumbai') ") + .run("create table table4 (i int, j int)") + .dump( + "repl dump " + primaryDbName + " from " + bootstrapTuple.lastReplicationId + " to " + + Long.parseLong(bootstrapTuple.lastReplicationId) + 100L + " limit 100 " + + "with ('hive.repl.dump.metadata.only'='true')" + ); + + replica.load(replicatedDbName, incrementalOneTuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "renamed_table1", "table2", "table3", "table4" }) + .run("select * from renamed_table1") + .verifyResults(Collections.emptyList()) + .run("select * from table2") + .verifyResults(Collections.emptyList()); + + //////////// Second Incremental //////////// + WarehouseInstance.Tuple secondIncremental = primary + .run("alter table table2 add columns (zipcode int)") + .run("alter table table3 change i a string") + .run("alter table table3 set tblproperties('custom.property'='custom.value')") + .run("drop table renamed_table1") + .dump("repl dump " + primaryDbName + " from " + incrementalOneTuple.lastReplicationId + + " with ('hive.repl.dump.metadata.only'='true')" + ); + + replica.load(replicatedDbName, secondIncremental.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] { "table2", "table3", "table4" }) + .run("desc table3") + .verifyResults(new String[] { + "a \tstring \t ", + "j \tint \t " + }) + .run("desc table2") + .verifyResults(new String[] { + "a \tint \t ", + "city \tstring \t ", + "country \tstring \t ", + "zipcode \tint \t ", + "\t \t ", + "# Partition Information\t \t ", + "# col_name \tdata_type \tcomment ", + "country \tstring \t ", + }) + .run("show tblproperties table3('custom.property')") + .verifyResults(new String[] { + "custom.value\t " + }); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 061817fc7f..aa08199152 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -47,6 +47,7 @@ Licensed to the Apache Software Foundation (ASF) under one import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -178,16 +179,28 @@ WarehouseInstance run(String command) throws Throwable { return this; } - Tuple dump(String dbName, String lastReplicationId) throws Throwable { - advanceDumpDir(); + Tuple dump(String dbName, String lastReplicationId, List withClauseOptions) + throws Throwable { String dumpCommand = "REPL DUMP " + dbName + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); + if (!withClauseOptions.isEmpty()) { + dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return dump(dumpCommand); + } + + Tuple dump(String dumpCommand) throws Throwable { + advanceDumpDir(); run(dumpCommand); String dumpLocation = row0Result(0, false); String lastDumpId = row0Result(1, true); return new Tuple(dumpLocation, lastDumpId); } + Tuple dump(String dbName, String lastReplicationId) throws Throwable { + return dump(dbName, lastReplicationId, Collections.emptyList()); + } + WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Throwable { run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); printOutput(); @@ -211,10 +224,15 @@ WarehouseInstance verifyResults(String[] data) throws IOException { List results = getOutput(); logger.info("Expecting {}", StringUtils.join(data, ",")); logger.info("Got {}", results); - assertEquals(data.length, results.size()); - for (int i = 0; i < data.length; i++) { - assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); - } + List filteredResults = results.stream().filter( + x -> !x.toLowerCase() + .contains(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())) + .map(String::toLowerCase) + .collect(Collectors.toList()); + List lowerCaseData = + Arrays.stream(data).map(String::toLowerCase).collect(Collectors.toList()); + assertEquals(data.length, filteredResults.size()); + assertTrue(filteredResults.containsAll(lowerCaseData)); return this; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index eade36f3cd..639f319d7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -177,8 +177,10 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Ex replLogger.eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString()); } - private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { - ReplicationSpec rspec = getNewReplicationSpec(eventId.toString(), eventId.toString()); + private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) { + ReplicationSpec rspec = + getNewReplicationSpec(eventId.toString(), eventId.toString(), conf.getBoolean( + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, false)); rspec.setReplSpecType(ReplicationSpec.Type.INCREMENTAL_DUMP); return rspec; } @@ -269,8 +271,9 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except } } - private ReplicationSpec getNewReplicationSpec(String evState, String objState) { - return new ReplicationSpec(true, false, evState, objState, false, true, true); + private ReplicationSpec getNewReplicationSpec(String evState, String objState, + boolean isMetadataOnly) { + return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true, true); } private String getNextDumpDir() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 4ab7312b7e..9a1dc97d04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer; @@ -261,16 +262,15 @@ public static void createDbExportDump(FileSystem fs, Path metadataPath, Database } } - public static void createExportDump(FileSystem fs, Path metadataPath, - org.apache.hadoop.hive.ql.metadata.Table tableHandle, - Iterable partitions, - ReplicationSpec replicationSpec) throws SemanticException, IOException { + public static void createExportDump(FileSystem fs, Path metadataPath, Table tableHandle, + Iterable partitions, ReplicationSpec replicationSpec, HiveConf hiveConf) + throws SemanticException, IOException { - if (replicationSpec == null){ + if (replicationSpec == null) { replicationSpec = new ReplicationSpec(); // instantiate default values if not specified } - if (tableHandle == null){ + if (tableHandle == null) { replicationSpec.setNoop(true); } @@ -278,7 +278,7 @@ public static void createExportDump(FileSystem fs, Path metadataPath, if (replicationSpec.isInReplicationScope()) { new ReplicationSpecSerializer().writeTo(writer, replicationSpec); } - new TableSerializer(tableHandle, partitions).writeTo(writer, replicationSpec); + new TableSerializer(tableHandle, partitions, hiveConf).writeTo(writer, replicationSpec); } } @@ -404,50 +404,4 @@ public boolean accept(Path p) { } }; } - - /** - * Verify if a table should be exported or not - */ - public static Boolean shouldExportTable(ReplicationSpec replicationSpec, Table tableHandle) throws SemanticException { - if (replicationSpec == null) - { - replicationSpec = new ReplicationSpec(); - } - - if (replicationSpec.isNoop()) - { - return false; - } - - if (tableHandle == null) - { - return false; - } - - if (replicationSpec.isInReplicationScope()) { - return !(tableHandle == null || tableHandle.isTemporary() || tableHandle.isNonNative() || - (tableHandle.getParameters() != null && StringUtils.equals(tableHandle.getParameters().get("transactional"), "true"))); - } - - if (tableHandle.isNonNative()) { - throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg()); - } - - return true; - } - - /** - * Verify if a table should be exported or not by talking to metastore to fetch table info. - * Return true when running into errors with metastore call. - */ - public static Boolean tryValidateShouldExportTable(Hive db, String dbName, String tableName, ReplicationSpec replicationSpec) { - try { - Table table = db.getTable(dbName, tableName); - return EximUtil.shouldExportTable(replicationSpec, table); - } catch (Exception e) { - // Swallow the exception - LOG.error("Failed to validate if the table should be exported or not", e); - } - return true; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 209f75de9e..3360c0e53d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -879,7 +879,8 @@ replDumpStatement (KW_TO (rangeEnd=Number))? (KW_LIMIT (batchSize=Number))? )? - -> ^(TOK_REPL_DUMP $dbName $tblName? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? ) + (KW_WITH replConf=replConfigs)? + -> ^(TOK_REPL_DUMP $dbName ^(TOK_TABNAME $tblName)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) ; replLoadStatement diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java index 7a28b433af..197f2194f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java @@ -21,6 +21,7 @@ import java.text.SimpleDateFormat; import java.util.Date; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.slf4j.Logger; @@ -86,7 +87,8 @@ private void export_meta_data(PreDropTableEvent tableEvent) throws MetaException Path outFile = new Path(metaPath, name + EximUtil.METADATA_NAME); try { SessionState.getConsole().printInfo("Beginning metadata export"); - EximUtil.createExportDump(fs, outFile, mTbl, null, null); + EximUtil.createExportDump(fs, outFile, mTbl, null, null, + new HiveConf(conf, MetaDataExportListener.class)); if (moveMetadataToTrash == true) { wh.deleteDir(metaPath, true); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 80556aec88..b8f7299518 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -34,13 +34,13 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; -import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; -import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; @@ -59,7 +59,6 @@ Licensed to the Apache Software Foundation (ASF) under one import java.util.Map; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; -import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; @@ -128,9 +127,17 @@ private void initReplDump(ASTNode ast) { // skip the first node, which is always required int currNode = 1; while (currNode < numChildren) { - if (ast.getChild(currNode).getType() != TOK_FROM) { + if (ast.getChild(currNode).getType() == TOK_REPL_CONFIG) { + Map replConfigs + = DDLSemanticAnalyzer.getProps((ASTNode) ast.getChild(currNode).getChild(0)); + if (null != replConfigs) { + for (Map.Entry config : replConfigs.entrySet()) { + conf.set(config.getKey(), config.getValue()); + } + } + } else if (ast.getChild(currNode).getType() == TOK_TABNAME) { // optional tblName was specified. - tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getText()); + tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getChild(0).getText()); } else { // TOK_FROM subtree Tree fromNode = ast.getChild(currNode); @@ -152,8 +159,6 @@ private void initReplDump(ASTNode ast) { // move to the next child in FROM tree numChild++; } - // FROM node is always the last - break; } // move to the next root node currNode++; @@ -176,7 +181,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { ErrorMsg.INVALID_PATH.getMsg(ast), maxEventLimit, ctx.getResFile().toUri().toString() - ), conf); + ), conf, true); rootTasks.add(replDumpWorkTask); if (dbNameOrPattern != null) { for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index a44f98f444..6b56362462 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -26,6 +26,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -34,7 +35,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.AuthEntities; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,16 +60,16 @@ Licensed to the Apache Software Foundation (ASF) under one private final HiveConf conf; private final Paths paths; - public TableExport(Paths paths, TableSpec tableSpec, - ReplicationSpec replicationSpec, Hive db, String distCpDoAsUser, HiveConf conf) - throws SemanticException { + public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, + String distCpDoAsUser, HiveConf conf) { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) ? null : tableSpec; this.replicationSpec = replicationSpec; - if (this.tableSpec != null && this.tableSpec.tableHandle.isView()) { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || (this.tableSpec != null + && this.tableSpec.tableHandle.isView())) { this.replicationSpec.setIsMetadataOnly(true); } this.db = db; @@ -82,10 +82,6 @@ public void write() throws SemanticException { if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { - //first we should get the correct replication spec before doing metadata/data export - if (tableSpec.tableHandle.isView()) { - replicationSpec.setIsMetadataOnly(true); - } PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly()) { @@ -130,7 +126,8 @@ private void writeMetaData(PartitionIterable partitions) throws SemanticExceptio paths.metaDataExportFile(), tableSpec == null ? null : tableSpec.tableHandle, partitions, - replicationSpec); + replicationSpec, + conf); logger.debug("_metadata file written into " + paths.metaDataExportFile().toString()); } catch (Exception e) { // the path used above should not be used on a second try as each dump request is written to a unique location. @@ -159,8 +156,12 @@ private void writeData(PartitionIterable partitions) throws SemanticException { } } - private boolean shouldExport() throws SemanticException { - return EximUtil.shouldExportTable(replicationSpec, tableSpec.tableHandle); + private boolean shouldExport() { + if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_INCLUDE_ACID_TABLES) + && AcidUtils.isAcidTable(tableSpec.tableHandle)) { + return true; + } + return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, conf); } /** @@ -172,7 +173,7 @@ private boolean shouldExport() throws SemanticException { private final HiveConf conf; private final Path exportRootDir; private final FileSystem exportFileSystem; - private boolean writeData = true; + private boolean writeData; public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, HiveConf conf, boolean shouldWriteData) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 97f0e0befe..602b1a97d0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -1,19 +1,19 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.ql.parse.repl.dump; @@ -21,9 +21,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.io.IOUtils; @@ -152,4 +156,46 @@ public static boolean isBootstrapDumpInProgress(Hive hiveDb, String dbName) thro } return false; } + + /** + * validates if a table can be exported, similar to EximUtil.shouldExport with few replication + * specific checks. + */ + public static Boolean shouldReplicate(ReplicationSpec replicationSpec, Table tableHandle, + HiveConf hiveConf) { + if (replicationSpec == null) { + replicationSpec = new ReplicationSpec(); + } + + if (replicationSpec.isNoop() || tableHandle == null) { + return false; + } + + if (tableHandle.isNonNative()) { + return false; + } + + if (replicationSpec.isInReplicationScope()) { + boolean isAcidTable = AcidUtils.isAcidTable(tableHandle); + if (isAcidTable) { + return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_INCLUDE_ACID_TABLES); + } + return !tableHandle.isTemporary(); + } + return true; + } + + public static boolean shouldReplicate(NotificationEvent tableForEvent, + ReplicationSpec replicationSpec, Hive db, HiveConf hiveConf) { + Table table; + try { + table = db.getTable(tableForEvent.getDbName(), tableForEvent.getTableName()); + } catch (HiveException e) { + LOG.info( + "error while getting table info for" + tableForEvent.getDbName() + "." + tableForEvent + .getTableName(), e); + return false; + } + return shouldReplicate(replicationSpec, table, hiveConf); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java new file mode 100644 index 0000000000..d7a29e36f9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; + +abstract class AbstractConstraintEventHandler extends AbstractEventHandler { + AbstractConstraintEventHandler(NotificationEvent event) { + super(event); + } + + boolean shouldReplicate(Context withinContext) { + return Utils.shouldReplicate( + event, + withinContext.replicationSpec, + withinContext.db, + withinContext.hiveConf + ); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java index d0cbd4a772..8fdf2f16a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddForeignKeyHandler.java @@ -18,28 +18,23 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class AddForeignKeyHandler extends AbstractEventHandler { +public class AddForeignKeyHandler extends AbstractConstraintEventHandler { AddForeignKeyHandler(NotificationEvent event) { super(event); } @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ADD_FOREIGNKEY_MESSAGE message : {}", fromEventId(), event.getMessage()); - - if (!EximUtil.tryValidateShouldExportTable(withinContext.db, event.getDbName(), event.getTableName(), withinContext.replicationSpec)) - { - return; + LOG.debug("Processing#{} ADD_FOREIGNKEY_MESSAGE message : {}", fromEventId(), + event.getMessage()); + if (shouldReplicate(withinContext)) { + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); } - - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java index aa7f4ef0a6..335d4e6af9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddNotNullConstraintHandler.java @@ -18,28 +18,24 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class AddNotNullConstraintHandler extends AbstractEventHandler { +public class AddNotNullConstraintHandler extends AbstractConstraintEventHandler { AddNotNullConstraintHandler(NotificationEvent event) { super(event); } @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ADD_NOTNULLCONSTRAINT_MESSAGE message : {}", fromEventId(), event.getMessage()); + LOG.debug("Processing#{} ADD_NOTNULLCONSTRAINT_MESSAGE message : {}", fromEventId(), + event.getMessage()); - if (!EximUtil.tryValidateShouldExportTable(withinContext.db, event.getDbName(), event.getTableName(), withinContext.replicationSpec)) - { - return; + if (shouldReplicate(withinContext)) { + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); } - - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index def83842ee..a3d80a0640 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -28,14 +26,15 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; -import javax.annotation.Nullable; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Iterator; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; class AddPartitionHandler extends AbstractEventHandler { protected AddPartitionHandler(NotificationEvent notificationEvent) { @@ -54,7 +53,7 @@ public void handle(Context withinContext) throws Exception { } final Table qlMdTable = new Table(tobj); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; } @@ -64,23 +63,17 @@ public void handle(Context withinContext) throws Exception { return; } - Iterable qlPtns = Iterables.transform( - ptns, - new Function() { - @Nullable - @Override - public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) { - if (input == null) { - return null; - } - try { - return new Partition(qlMdTable, input); - } catch (HiveException e) { - throw new IllegalArgumentException(e); - } + Iterable qlPtns = StreamSupport.stream(ptns.spliterator(), false).map( + input -> { + if (input == null) { + return null; } - } - ); + try { + return new Partition(qlMdTable, input); + } catch (HiveException e) { + throw new IllegalArgumentException(e); + } + }).collect(Collectors.toList()); Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); EximUtil.createExportDump( @@ -88,7 +81,8 @@ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition metaDataPath, qlMdTable, qlPtns, - withinContext.replicationSpec); + withinContext.replicationSpec, + withinContext.hiveConf); Iterator partitionFilesIter = apm.getPartitionFilesIter().iterator(); for (Partition qlPtn : qlPtns) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java index 344fac9df9..cf45c684a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPrimaryKeyHandler.java @@ -18,28 +18,24 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class AddPrimaryKeyHandler extends AbstractEventHandler { +public class AddPrimaryKeyHandler extends AbstractConstraintEventHandler { AddPrimaryKeyHandler(NotificationEvent event) { super(event); } @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ADD_PRIMARYKEY_MESSAGE message : {}", fromEventId(), event.getMessage()); + LOG.debug("Processing#{} ADD_PRIMARYKEY_MESSAGE message : {}", fromEventId(), + event.getMessage()); - if (!EximUtil.tryValidateShouldExportTable(withinContext.db, event.getDbName(), event.getTableName(), withinContext.replicationSpec)) - { - return; + if (shouldReplicate(withinContext)) { + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); } - - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java index 4cc75a7d01..58835a0352 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddUniqueConstraintHandler.java @@ -18,28 +18,24 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class AddUniqueConstraintHandler extends AbstractEventHandler { +public class AddUniqueConstraintHandler extends AbstractConstraintEventHandler { AddUniqueConstraintHandler(NotificationEvent event) { super(event); } @Override public void handle(Context withinContext) throws Exception { - LOG.info("Processing#{} ADD_UNIQUECONSTRAINT_MESSAGE message : {}", fromEventId(), event.getMessage()); + LOG.debug("Processing#{} ADD_UNIQUECONSTRAINT_MESSAGE message : {}", fromEventId(), + event.getMessage()); - if (!EximUtil.tryValidateShouldExportTable(withinContext.db, event.getDbName(), event.getTableName(), withinContext.replicationSpec)) - { - return; + if (shouldReplicate(withinContext)) { + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); } - - DumpMetaData dmd = withinContext.createDmd(this); - dmd.setPayload(event.getMessage()); - dmd.write(); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index 58df6650fd..cde4eed986 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; class AlterPartitionHandler extends AbstractEventHandler { @@ -88,7 +89,7 @@ public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} ALTER_PARTITION message : {}", fromEventId(), event.getMessage()); Table qlMdTable = new Table(tableObject); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; } @@ -102,7 +103,8 @@ public void handle(Context withinContext) throws Exception { metaDataPath, qlMdTable, partitions, - withinContext.replicationSpec); + withinContext.replicationSpec, + withinContext.hiveConf); } DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(event.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index 4e3ce0e8cb..5f582b32d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; class AlterTableHandler extends AbstractEventHandler { @@ -80,7 +81,8 @@ public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} ALTER_TABLE message : {}", fromEventId(), event.getMessage()); Table qlMdTableBefore = new Table(before); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTableBefore)) { + if (!Utils + .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, withinContext.hiveConf)) { return; } @@ -89,11 +91,12 @@ public void handle(Context withinContext) throws Exception { Table qlMdTableAfter = new Table(after); Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); EximUtil.createExportDump( - metaDataPath.getFileSystem(withinContext.hiveConf), - metaDataPath, - qlMdTableAfter, - null, - withinContext.replicationSpec); + metaDataPath.getFileSystem(withinContext.hiveConf), + metaDataPath, + qlMdTableAfter, + null, + withinContext.replicationSpec, + withinContext.hiveConf); } DumpMetaData dmd = withinContext.createDmd(this); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index ef6f340013..3804396626 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import java.io.BufferedWriter; import java.io.IOException; @@ -48,7 +49,7 @@ public void handle(Context withinContext) throws Exception { Table qlMdTable = new Table(tobj); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; } @@ -62,7 +63,8 @@ public void handle(Context withinContext) throws Exception { metaDataPath, qlMdTable, null, - withinContext.replicationSpec); + withinContext.replicationSpec, + withinContext.hiveConf); Path dataPath = new Path(withinContext.eventRoot, "data"); Iterable files = ctm.getFiles(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index df852a3dce..5ac3af0c30 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -19,11 +19,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import java.io.BufferedWriter; @@ -41,10 +43,13 @@ @Override public void handle(Context withinContext) throws Exception { + if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) { + return; + } InsertMessage insertMsg = deserializer.getInsertMessage(event.getMessage()); org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(insertMsg); - if (!EximUtil.shouldExportTable(withinContext.replicationSpec, qlMdTable)) { + if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) { return; } @@ -58,7 +63,8 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec.setIsReplace(insertMsg.isReplace()); EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, qlMdTable, qlPtns, - withinContext.replicationSpec); + withinContext.replicationSpec, + withinContext.hiveConf); Iterable files = insertMsg.getFiles(); if (files != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index c3a70cc8f9..143808bb85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TJSONProtocol; @@ -35,17 +36,19 @@ public static final String FIELD_NAME = "table"; private final org.apache.hadoop.hive.ql.metadata.Table tableHandle; private final Iterable partitions; + private final HiveConf hiveConf; public TableSerializer(org.apache.hadoop.hive.ql.metadata.Table tableHandle, - Iterable partitions) { + Iterable partitions, HiveConf hiveConf) { this.tableHandle = tableHandle; this.partitions = partitions; + this.hiveConf = hiveConf; } @Override public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { - if (!EximUtil.shouldExportTable(additionalPropertiesProvider, tableHandle)) { + if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, hiveConf)) { return; } @@ -62,8 +65,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi } } - private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider) - throws SemanticException, IOException { + private Table addPropertiesToTable(Table table, ReplicationSpec additionalPropertiesProvider) { if (additionalPropertiesProvider.isInReplicationScope()) { // Current replication state must be set on the Table object only for bootstrap dump. // Event replication State will be null in case of bootstrap dump.