diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java index 4caa884b11..c552bae56d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java @@ -17,8 +17,21 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.WarehouseInstance; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; +import org.junit.Assert; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * ReplicationTestUtils - static helper functions for replication test @@ -488,4 +501,38 @@ public static void insertForMerge(WarehouseInstance primary, String primaryDbNam .verifyResults(new String[] {"creation", "creation", "creation", "creation", "creation", "creation", "creation", "merge_update", "merge_insert", "merge_insert"}); } + + public static List externalTableBasePathWithClause(String replExternalBase, WarehouseInstance replica) + throws IOException, SemanticException { + Path externalTableLocation = new Path(replExternalBase); + DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); + externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem); + fileSystem.mkdirs(externalTableLocation); + + // this is required since the same filesystem is used in both source and target + return Arrays.asList( + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + externalTableLocation.toString() + "'", + "'distcp.options.pugpb'=''" + ); + } + + public static void assertExternalFileInfo(WarehouseInstance primary, + List expected, + Path externalTableInfoFile) throws IOException { + DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); + Assert.assertTrue(fileSystem.exists(externalTableInfoFile)); + InputStream inputStream = fileSystem.open(externalTableInfoFile); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + Set tableNames = new HashSet<>(); + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + String[] components = line.split(","); + Assert.assertEquals("The file should have tableName,base64encoded(data_location)", + 2, components.length); + tableNames.add(components[0]); + Assert.assertTrue(components[1].length() > 0); + } + Assert.assertTrue(tableNames.containsAll(expected)); + reader.close(); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 6f47056ca7..1f41d46782 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -395,7 +395,7 @@ private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tu HiveConf confTemp = new HiveConf(); confTemp.set("hive.repl.enable.move.optimization", "true"); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, - null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), + null, isIncrementalDump, Long.valueOf(tuple.lastReplId), Collections.emptyList()); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null); @@ -2784,7 +2784,7 @@ public void testIncrementalLoadFailAndRetry() throws IOException { } @Test - public void testStatus() throws IOException { + public void testStatus() throws Throwable { String name = testName.getMethodName(); String dbName = createDB(name, driver); String replDbName = dbName + "_dupe"; @@ -2794,25 +2794,25 @@ public void testStatus() throws IOException { // Bootstrap done, now on to incremental. First, we test db-level REPL LOADs. // Both db-level and table-level repl.last.id must be updated. - lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, lastReplDumpId, "CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", replDbName); - lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, lastReplDumpId, "ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", replDbName); - lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, lastReplDumpId, "ALTER TABLE " + dbName + ".ptned PARTITION (b=1) RENAME TO PARTITION (b=11)", replDbName); - lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned", lastReplDumpId, + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, lastReplDumpId, "ALTER TABLE " + dbName + ".ptned SET TBLPROPERTIES ('blah'='foo')", replDbName); - lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId, + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, lastReplDumpId, "ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_rn", replDbName); - lastReplDumpId = verifyAndReturnDbReplStatus(dbName, "ptned_rn", lastReplDumpId, + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, lastReplDumpId, "ALTER TABLE " + dbName + ".ptned_rn DROP PARTITION (b=11)", replDbName); - lastReplDumpId = verifyAndReturnDbReplStatus(dbName, null, lastReplDumpId, + lastReplDumpId = verifyAndReturnDbReplStatus(dbName, lastReplDumpId, "DROP TABLE " + dbName + ".ptned_rn", replDbName); @@ -2820,37 +2820,28 @@ public void testStatus() throws IOException { // In each of these cases, the table-level repl.last.id must move forward, but the // db-level last.repl.id must not. - String lastTblReplDumpId = lastReplDumpId; - lastTblReplDumpId = verifyAndReturnTblReplStatus( - dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + lastReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, "CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b int) STORED AS TEXTFILE", replDbName); - lastTblReplDumpId = verifyAndReturnTblReplStatus( - dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + lastReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, "ALTER TABLE " + dbName + ".ptned2 ADD PARTITION (b=1)", replDbName); - lastTblReplDumpId = verifyAndReturnTblReplStatus( - dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + lastReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, "ALTER TABLE " + dbName + ".ptned2 PARTITION (b=1) RENAME TO PARTITION (b=11)", replDbName); - lastTblReplDumpId = verifyAndReturnTblReplStatus( - dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + lastReplDumpId = verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, "ALTER TABLE " + dbName + ".ptned2 SET TBLPROPERTIES ('blah'='foo')", replDbName); // Note : Not testing table rename because table rename replication is not supported for table-level repl. - String finalTblReplDumpId = verifyAndReturnTblReplStatus( - dbName, "ptned2", lastReplDumpId, lastTblReplDumpId, + verifyAndReturnTblReplStatus( + dbName, "ptned2", lastReplDumpId, "ALTER TABLE " + dbName + ".ptned2 DROP PARTITION (b=11)", replDbName); - /* - Comparisons using Strings for event Ids is wrong. This should be numbers since lexical string comparison - and numeric comparision differ. This requires a broader change where we return the dump Id as long and not string - fixing this here for now as it was observed in one of the builds where "1001".compareTo("998") results - in failure of the assertion below. - */ - assertTrue(new Long(Long.parseLong(finalTblReplDumpId)).compareTo(Long.parseLong(lastTblReplDumpId)) > 0); - // TODO : currently not testing the following scenarios: // a) Multi-db wh-level REPL LOAD - need to add that // b) Insert into tables - quite a few cases need to be enumerated there, including dyn adds. @@ -3494,27 +3485,29 @@ private NotificationEvent createDummyEvent(String dbname, String tblname, long e return event; } - private String verifyAndReturnDbReplStatus(String dbName, String tblName, + private String verifyAndReturnDbReplStatus(String dbName, String prevReplDumpId, String cmd, String replDbName) throws IOException { run(cmd, driver); String lastReplDumpId = incrementalLoadAndVerify(dbName, prevReplDumpId, replDbName).lastReplId; - if (tblName != null){ - verifyRun("REPL STATUS " + replDbName + "." + tblName, lastReplDumpId, driverMirror); - } assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId)); return lastReplDumpId; } - // Tests that doing a table-level REPL LOAD updates table repl.last.id, but not db-level repl.last.id + // Tests that verify table's last repl ID private String verifyAndReturnTblReplStatus( - String dbName, String tblName, String lastDbReplDumpId, String prevReplDumpId, String cmd, - String replDbName) throws IOException { + String dbName, String tblName, String lastDbReplDumpId, String cmd, + String replDbName) throws IOException, TException { run(cmd, driver); String lastReplDumpId - = incrementalLoadAndVerify(dbName + "." + tblName, prevReplDumpId, replDbName + "." + tblName).lastReplId; - verifyRun("REPL STATUS " + replDbName, lastDbReplDumpId, driverMirror); - assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(prevReplDumpId)); + = incrementalLoadAndVerify(dbName, lastDbReplDumpId, replDbName).lastReplId; + verifyRun("REPL STATUS " + replDbName, lastReplDumpId, driverMirror); + assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(lastDbReplDumpId)); + + Table tbl = metaStoreClientMirror.getTable(replDbName, tblName); + String tblLastReplId = tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + assertTrue(Long.parseLong(tblLastReplId) > Long.parseLong(lastDbReplDumpId)); + assertTrue(Long.parseLong(tblLastReplId) <= Long.parseLong(lastReplDumpId)); return lastReplDumpId; } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index c0698311ed..fbdbb01ba0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -33,20 +33,15 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; -import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.security.UserGroupInformation; import org.junit.BeforeClass; import org.junit.Test; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -742,34 +737,11 @@ public Table apply(@Nullable Table table) { } private List externalTableBasePathWithClause() throws IOException, SemanticException { - Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE); - DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); - externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem); - fileSystem.mkdirs(externalTableLocation); - - // this is required since the same filesystem is used in both source and target - return Arrays.asList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + externalTableLocation.toString() + "'", - "'distcp.options.pugpb'=''" - ); + return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); } private void assertExternalFileInfo(List expected, Path externalTableInfoFile) throws IOException { - DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); - assertTrue(fileSystem.exists(externalTableInfoFile)); - InputStream inputStream = fileSystem.open(externalTableInfoFile); - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); - Set tableNames = new HashSet<>(); - for (String line = reader.readLine(); line != null; line = reader.readLine()) { - String[] components = line.split(","); - assertEquals("The file should have tableName,base64encoded(data_location)", - 2, components.length); - tableNames.add(components[0]); - assertTrue(components[1].length() > 0); - } - assertTrue(tableNames.containsAll(expected)); - reader.close(); + ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java index 76197183d9..3a86cb7ca3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java @@ -221,18 +221,16 @@ public void testTableLevelDumpMigration() throws Throwable { WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create table t1 (i int, j int)") - .dump(primaryDbName+".t1", null); + .dump(primaryDbName+".['t1']", null); replica.run("create database " + replicatedDbName); - replica.loadWithoutExplain(replicatedDbName + ".t1", tuple.dumpLocation); - assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - assertTrue(ReplUtils.isFirstIncPending(replica.getTable(replicatedDbName, "t1").getParameters())); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); + assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); tuple = primary.run("use " + primaryDbName) .run("insert into t1 values (1, 2)") - .dump(primaryDbName+".t1", tuple.lastReplicationId); - replica.loadWithoutExplain(replicatedDbName + ".t1", tuple.dumpLocation); + .dump(primaryDbName+".['t1']", tuple.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters())); - assertFalse(ReplUtils.isFirstIncPending(replica.getTable(replicatedDbName, "t1").getParameters())); } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java new file mode 100644 index 0000000000..d84c6e7849 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; + +/** + * Tests Table level replication scenarios. + */ +public class TestTableLevelReplicationScenarios extends BaseReplicationScenariosAcidTables { + + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Map overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestTableLevelReplicationScenarios.class); + } + + enum CreateTableType { + FULL_ACID, MM_ACID, NON_ACID, EXTERNAL + } + + class CreateTableInfo { + String tableName; + CreateTableType type; + boolean isPartitioned; + + CreateTableInfo(String tableName, CreateTableType type, boolean isPartitioned) { + this.tableName = tableName; + this.type = type; + this.isPartitioned = isPartitioned; + } + } + + private void createTables(List createTblsInfo) throws Throwable { + for (CreateTableInfo tblInfo : createTblsInfo) { + StringBuilder strBuilder = new StringBuilder("create "); + if (tblInfo.type == CreateTableType.EXTERNAL) { + strBuilder.append(" external "); + } + strBuilder.append(" table ").append(primaryDbName).append(".").append(tblInfo.tableName); + + if (tblInfo.isPartitioned) { + strBuilder.append(" (a int) partitioned by (b int) "); + } else { + strBuilder.append(" (a int, b int) "); + } + + if (tblInfo.type == CreateTableType.FULL_ACID) { + strBuilder.append(" clustered by (a) into 2 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")"); + } else if (tblInfo.type == CreateTableType.MM_ACID) { + strBuilder.append(" tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")"); + } + + String createTableCmd = strBuilder.toString(); + primary.run("use " + primaryDbName) + .run(createTableCmd) + .run("insert into " + tblInfo.tableName + " values(1, 10)"); + } + } + + private void createTables(String[] tableNames, CreateTableType type) throws Throwable { + List createTablesInfo = new ArrayList<>(); + for (String tblName : tableNames) { + createTablesInfo.add(new CreateTableInfo(tblName, type, false)); + } + createTables(createTablesInfo); + } + + private String replicateAndVerify(String replPolicy, String lastReplId, + List dumpWithClause, + List loadWithClause, + String[] expectedTables) throws Throwable { + if (dumpWithClause == null) { + dumpWithClause = new ArrayList<>(); + } + if (loadWithClause == null) { + loadWithClause = new ArrayList<>(); + } + + // For bootstrap replication, drop the target database before triggering it. + if (lastReplId == null) { + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .dump(replPolicy, lastReplId, dumpWithClause); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(expectedTables); + return tuple.lastReplicationId; + } + + @Test + public void testBasicBootstrapWithIncludeList() throws Throwable { + String[] originalNonAcidTables = new String[] {"t1", "t2" }; + String[] originalFullAcidTables = new String[] {"t3", "t4" }; + String[] originalMMAcidTables = new String[] {"t5" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 2 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1', 't4', 't5']"; + String[] replicatedTables = new String[] {"t1", "t4", "t5" }; + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } + + @Test + public void testBasicBootstrapWithIncludeAndExcludeList() throws Throwable { + String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t100" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // Replicate and verify if only 3 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1*', 't3'].['t100']"; + String[] replicatedTables = new String[] {"t1", "t11", "t3" }; + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } + + @Test + public void testBasicIncrementalWithIncludeList() throws Throwable { + WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + .dump(primaryDbName, null); + replica.load(replicatedDbName, tupleBootstrap.dumpLocation); + + String[] originalNonAcidTables = new String[] {"t1", "t2" }; + String[] originalFullAcidTables = new String[] {"t3", "t4" }; + String[] originalMMAcidTables = new String[] {"t5" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 2 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1', 't5']"; + String[] replicatedTables = new String[] {"t1", "t5" }; + replicateAndVerify(replPolicy, tupleBootstrap.lastReplicationId, null, null, replicatedTables); + } + + @Test + public void testBasicIncrementalWithIncludeAndExcludeList() throws Throwable { + WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + .dump(primaryDbName, null); + replica.load(replicatedDbName, tupleBootstrap.dumpLocation); + + String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // Replicate and verify if only 3 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1+', 't2'].['t11', 't3']"; + String[] replicatedTables = new String[] {"t1", "t111", "t2" }; + replicateAndVerify(replPolicy, tupleBootstrap.lastReplicationId, null, null, replicatedTables); + } + + @Test + public void testReplDumpWithIncorrectTablePolicy() throws Throwable { + String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // Invalid repl policy where abrubtly placed DOT which causes ParseException during REPL dump. + String[] replicatedTables = new String[]{}; + boolean failed; + String[] invalidReplPolicies = new String[] { + primaryDbName + ".t1.t2", // Two explicit table names not allowed. + primaryDbName + ".['t1'].t2", // Table name and include list not allowed. + primaryDbName + ".t1.['t2']", // Table name and exclude list not allowed. + primaryDbName + ".[t1].t2", // Table name and include list not allowed. + primaryDbName + ".['t1+'].", // Abrubtly ended dot. + primaryDbName + "..[]" // Multiple dots + }; + for (String replPolicy : invalidReplPolicies) { + failed = false; + try { + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } catch (Exception ex) { + LOG.info("Got exception: {}", ex.getMessage()); + Assert.assertTrue(ex instanceof ParseException); + failed = true; + } + Assert.assertTrue(failed); + } + + // Invalid pattern where we didn't enclose table pattern within single or double quotes. + String replPolicy = primaryDbName + ".[t1].[t2]"; + failed = false; + try { + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } catch (Exception ex) { + LOG.info("Got exception: {}", ex.getMessage()); + Assert.assertTrue(ex instanceof SemanticException); + Assert.assertTrue(ex.getMessage().equals(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getMsg())); + failed = true; + } + Assert.assertTrue(failed); + } + + @Test + public void testFullDbBootstrapReplicationWithDifferentReplPolicyFormats() throws Throwable { + String[] originalTables = new String[] {"t1", "t200", "t3333" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // List of repl policy formats that leads to Full DB replication. + String[] fullDbReplPolicies = new String[] { + primaryDbName + ".['.*?']", + primaryDbName + ".['.*?'].[]" + }; + + // Replicate and verify if all 3 tables are replicated to target. + for (String replPolicy : fullDbReplPolicies) { + replicateAndVerify(replPolicy, null, null, null, originalTables); + } + } + + @Test + public void testCaseInSensitiveNatureOfReplPolicy() throws Throwable { + String[] originalTables = new String[] {"a1", "aA11", "B2", "Cc3" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // Replicate and verify if 2 tables are replicated as per policy. + String replPolicy = primaryDbName.toUpperCase() + ".['.*a1+', 'cc3', 'B2'].['AA1+', 'b2']"; + String[] replicatedTables = new String[] {"a1", "cc3" }; + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } + + @Test + public void testBootstrapAcidTablesIncrementalPhaseWithIncludeAndExcludeList() throws Throwable { + String[] originalNonAcidTables = new String[] {"a1", "b2" }; + String[] originalFullAcidTables = new String[] {"a2", "b1" }; + String[] originalMMAcidTables = new String[] {"a3", "a4" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only non-acid tables are replicated to target. + List dumpWithoutAcidClause = Collections.singletonList( + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'"); + String replPolicy = primaryDbName + ".['a[0-9]+', 'b1'].['a4']"; + String[] bootstrapReplicatedTables = new String[] {"a1" }; + String lastReplId = replicateAndVerify(replPolicy, null, dumpWithoutAcidClause, null, bootstrapReplicatedTables); + + // Enable acid tables for replication. + List dumpWithAcidBootstrapClause = Arrays.asList( + "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'"); + String[] incrementalReplicatedTables = new String[] {"a1", "a2", "a3", "b1" }; + replicateAndVerify(replPolicy, lastReplId, dumpWithAcidBootstrapClause, null, incrementalReplicatedTables); + } + + @Test + public void testBootstrapExternalTablesWithIncludeAndExcludeList() throws Throwable { + String[] originalNonAcidTables = new String[] {"a1", "b2" }; + String[] originalExternalTables = new String[] {"a2", "b1" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalExternalTables, CreateTableType.EXTERNAL); + + // Replicate and verify if only 2 tables are replicated to target. + List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + String replPolicy = primaryDbName + ".['a[0-9]+', 'b2'].['a1']"; + String[] replicatedTables = new String[] {"a2", "b2" }; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .dump(replPolicy, null, dumpWithClause); + + // the _external_tables_file info should be created as external tables are to be replicated. + Assert.assertTrue(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + // Verify that the external table info contains only table "a2". + ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), + new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(replicatedTables); + } + + @Test + public void testBootstrapExternalTablesIncrementalPhaseWithIncludeAndExcludeList() throws Throwable { + String[] originalNonAcidTables = new String[] {"a1", "b2" }; + String[] originalExternalTables = new String[] {"a2", "b1" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalExternalTables, CreateTableType.EXTERNAL); + + // Bootstrap should exclude external tables. + List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'" + ); + String replPolicy = primaryDbName + ".['a[0-9]+', 'b2'].['a1']"; + String[] bootstrapReplicatedTables = new String[] {"b2" }; + String lastReplId = replicateAndVerify(replPolicy, null, dumpWithClause, loadWithClause, bootstrapReplicatedTables); + + // Enable external tables replication and bootstrap in incremental phase. + String[] incrementalReplicatedTables = new String[] {"a2", "b2" }; + dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'", + "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .dump(replPolicy, lastReplId, dumpWithClause); + + // the _external_tables_file info should be created as external tables are to be replicated. + Assert.assertTrue(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + // Verify that the external table info contains only table "a2". + ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), + new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(incrementalReplicatedTables); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java index b9ef8b780e..eeda423ceb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java @@ -582,7 +582,7 @@ public void testReplDump() throws Exception { assertEquals("db name", dbName.toLowerCase(), dbObj.getDbname()); resetAuthorizer(); - status = driver.compile("repl dump " + fullInTableName); + status = driver.compile("repl dump " + dbName + ".['" + inDbTableName + "']"); assertEquals(0, status); inputs = getHivePrivilegeObjectInputs().getLeft(); dbObj = inputs.get(0); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 554df3c6bf..bcfbf5b067 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -514,6 +514,9 @@ REPL_LOAD_PATH_NOT_FOUND(20019, "Load path does not exist."), REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(20020, "Source of replication (repl.source.for) is not set in the database properties."), + REPL_INVALID_DB_OR_TABLE_PATTERN(20021, + "Invalid pattern for the DB or table name in the replication policy. " + + "It should be a valid regex enclosed within single or double quotes."), // An exception from runtime that will show the full stack to client UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 88ea73f8d5..b66c308712 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1518,29 +1518,16 @@ private boolean allowOperationInReplicationScope(Hive db, String tableName, private int remFirstIncPendFlag(Hive hive, ReplRemoveFirstIncLoadPendFlagDesc desc) throws HiveException, TException { String dbNameOrPattern = desc.getDatabaseName(); - String tableNameOrPattern = desc.getTableName(); Map parameters; - // For database level load tableNameOrPattern will be null. Flag is set only in database for db level load. - if (tableNameOrPattern != null && !tableNameOrPattern.isEmpty()) { - // For table level load, dbNameOrPattern is db name and not a pattern. - for (String tableName : Utils.matchesTbl(hive, dbNameOrPattern, tableNameOrPattern)) { - org.apache.hadoop.hive.metastore.api.Table tbl = hive.getMSC().getTable(dbNameOrPattern, tableName); - parameters = tbl.getParameters(); - String incPendPara = parameters != null ? parameters.get(ReplUtils.REPL_FIRST_INC_PENDING_FLAG) : null; - if (incPendPara != null) { - parameters.remove(ReplUtils.REPL_FIRST_INC_PENDING_FLAG); - hive.getMSC().alter_table(dbNameOrPattern, tableName, tbl); - } - } - } else { - for (String dbName : Utils.matchesDb(hive, dbNameOrPattern)) { - Database database = hive.getMSC().getDatabase(dbName); - parameters = database.getParameters(); - String incPendPara = parameters != null ? parameters.get(ReplUtils.REPL_FIRST_INC_PENDING_FLAG) : null; - if (incPendPara != null) { - parameters.remove(ReplUtils.REPL_FIRST_INC_PENDING_FLAG); - hive.getMSC().alterDatabase(dbName, database); - } + + // Flag is set only in database for db level load. + for (String dbName : Utils.matchesDb(hive, dbNameOrPattern)) { + Database database = hive.getMSC().getDatabase(dbName); + parameters = database.getParameters(); + String incPendPara = parameters != null ? parameters.get(ReplUtils.REPL_FIRST_INC_PENDING_FLAG) : null; + if (incPendPara != null) { + parameters.remove(ReplUtils.REPL_FIRST_INC_PENDING_FLAG); + hive.getMSC().alterDatabase(dbName, database); } } return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 2f72e23526..8c963be85d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -33,8 +33,8 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; -import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; +import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -184,9 +184,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0); - LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {} and table {}", - work.dbNameOrPattern, - work.tableNameOrPattern); + LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}", + work.dbNameOrPattern); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -201,7 +200,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId); IMetaStoreClient.NotificationFilter evFilter = new AndFilter( - new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern), + new ReplEventFilter(work.replScope), new EventBoundaryFilter(work.eventFrom, work.eventTo)); EventUtils.MSClientNotificationFetcher evFetcher @@ -255,7 +254,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); try (Writer writer = new Writer(dumpRoot, conf)) { - for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { + for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { try { Table table = hiveDb.getTable(dbName, tableName); @@ -297,8 +296,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) db, conf, getNewEventOnlyReplicationSpec(ev.getEventId()), - work.dbNameOrPattern, - work.tableNameOrPattern + work.replScope ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); @@ -321,7 +319,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0L); - LOG.info("Bootstrap Dump for db {} and table {}", work.dbNameOrPattern, work.tableNameOrPattern); + LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); long waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -337,7 +335,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th " with first incremental dump pending : " + dbName); } replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), - Utils.getAllTables(hiveDb, dbName).size(), + Utils.getAllTables(hiveDb, dbName, work.replScope).size(), hiveDb.getAllFunctions().size()); replLogger.startLog(); Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb); @@ -349,7 +347,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); try (Writer writer = new Writer(dbRoot, conf)) { - for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { + for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index d32be969e1..247066c451 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import com.google.common.primitives.Ints; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.LoggerFactory; @@ -28,7 +29,8 @@ Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ReplDumpWork implements Serializable { - final String dbNameOrPattern, tableNameOrPattern, astRepresentationForErrorMsg, resultTempPath; + final ReplScope replScope; + final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; final Long eventFrom; Long eventTo; private Integer maxEventLimit; @@ -38,11 +40,11 @@ public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; } - public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern, + public ReplDumpWork(ReplScope replScope, Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit, String resultTempPath) { - this.dbNameOrPattern = dbNameOrPattern; - this.tableNameOrPattern = tableNameOrPattern; + this.replScope = replScope; + this.dbNameOrPattern = replScope.getDbName(); this.eventFrom = eventFrom; this.eventTo = eventTo; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 88e6327eab..f4f98a69d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -149,9 +149,7 @@ a database ( directory ) switch (next.eventType()) { case Database: DatabaseEvent dbEvent = (DatabaseEvent) next; - dbTracker = - new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, work.tableNameToLoadIn, loadTaskTracker) - .tasks(); + dbTracker = new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker).tasks(); loadTaskTracker.update(dbTracker); if (work.hasDbState()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); @@ -174,8 +172,7 @@ a database ( directory ) listing before providing the lower level listing. This is also required such that the dbTracker / tableTracker are setup correctly always. */ - TableContext tableContext = - new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); + TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); TableEvent tableEvent = (TableEvent) next; LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), tableContext, loadTaskTracker); @@ -215,8 +212,7 @@ a database ( directory ) hence we know here that the table should exist and there should be a lastPartitionName */ PartitionEvent event = (PartitionEvent) next; - TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn, - work.tableNameToLoadIn); + TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn); LoadPartitions loadPartitions = new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker, event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index c5e083142a..f1f764e94b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -42,7 +42,6 @@ Explain.Level.EXTENDED }) public class ReplLoadWork implements Serializable { final String dbNameToLoadIn; - final String tableNameToLoadIn; final String dumpDirectory; final String bootstrapDumpToCleanTables; boolean needCleanTablesFromBootstrap; @@ -63,9 +62,8 @@ final LineageState sessionStateLineageState; public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump, Long eventTo, + LineageState lineageState, boolean isIncrementalDump, Long eventTo, List pathsToCopyIterator) throws IOException { - this.tableNameToLoadIn = tableNameToLoadIn; sessionStateLineageState = lineageState; this.dumpDirectory = dumpDirectory; this.dbNameToLoadIn = dbNameToLoadIn; @@ -74,8 +72,7 @@ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoad rootTask = null; if (isIncrementalDump) { - incrementalLoadTasksBuilder = - new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory, + incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory, new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo); /* diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index 87477278ba..d603e69c9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -94,7 +94,7 @@ public TaskTracker tasks() throws IOException, SemanticException { pkDumpMetaData.setPayload(pksString); tasks.addAll(pkHandler.handle( new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, + dbNameToLoadIn, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, context.hiveDb, context.nestedContext, LOG))); } @@ -105,7 +105,7 @@ public TaskTracker tasks() throws IOException, SemanticException { ukDumpMetaData.setPayload(uksString); tasks.addAll(ukHandler.handle( new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, + dbNameToLoadIn, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, context.hiveDb, context.nestedContext, LOG))); } @@ -116,7 +116,7 @@ public TaskTracker tasks() throws IOException, SemanticException { nnDumpMetaData.setPayload(nnsString); tasks.addAll(nnHandler.handle( new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, + dbNameToLoadIn, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, context.hiveDb, context.nestedContext, LOG))); } @@ -127,7 +127,7 @@ public TaskTracker tasks() throws IOException, SemanticException { fkDumpMetaData.setPayload(fksString); tasks.addAll(fkHandler.handle( new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, + dbNameToLoadIn, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, context.hiveDb, context.nestedContext, LOG))); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 343789584b..e33ad2eda7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -48,16 +48,12 @@ private final DatabaseEvent event; private final String dbNameToLoadIn; - private final boolean isTableLevelLoad; - public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, String tblNameToLoadIn, - TaskTracker loadTaskTracker) { + public LoadDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, TaskTracker loadTaskTracker) { this.context = context; this.event = event; this.dbNameToLoadIn = dbNameToLoadIn; this.tracker = new TaskTracker(loadTaskTracker); - //TODO : Load database should not be called for table level load. - isTableLevelLoad = tblNameToLoadIn != null && !tblNameToLoadIn.isEmpty(); } public TaskTracker tasks() throws Exception { @@ -121,7 +117,7 @@ private boolean isDbEmpty(String dbName) throws HiveException { private Task createDbTask(Database dbObj) { // note that we do not set location - for repl load, we want that auto-created. CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(dbObj.getName(), dbObj.getDescription(), null, false, - updateDbProps(dbObj, context.dumpDirectory, !isTableLevelLoad)); + updateDbProps(dbObj, context.dumpDirectory, true)); // If it exists, we want this to be an error condition. Repl Load is not intended to replace a // db. // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on. @@ -130,7 +126,7 @@ private boolean isDbEmpty(String dbName) throws HiveException { } private Task alterDbTask(Database dbObj) { - return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory, !isTableLevelLoad), + return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory, true), context.hiveConf); } @@ -176,7 +172,7 @@ private boolean isDbEmpty(String dbName) throws HiveException { public AlterDatabase(Context context, DatabaseEvent event, String dbNameToLoadIn, TaskTracker loadTaskTracker) { - super(context, event, dbNameToLoadIn, null, loadTaskTracker); + super(context, event, dbNameToLoadIn, loadTaskTracker); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index 7f981fd969..2c053ef717 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -84,7 +84,7 @@ public TaskTracker tasks() throws IOException, SemanticException { CreateFunctionHandler handler = new CreateFunctionHandler(); List> tasks = handler.handle( new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, null, context.hiveConf, + dbNameToLoadIn, fromPath.toString(), null, null, context.hiveConf, context.hiveDb, context.nestedContext, LOG) ); createFunctionReplLogTask(tasks, handler.getFunctionName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 6e19cb4b69..39d85ba079 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -97,7 +97,7 @@ public LoadPartitions(Context context, ReplLogger replLogger, TableContext table this.lastReplicatedPartition = lastReplicatedPartition; this.tableContext = tableContext; - this.tableDesc = tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn)); + this.tableDesc = event.tableDesc(dbNameToLoadIn); this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 00c7be75c6..8a96136e28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -94,7 +94,7 @@ public TaskTracker tasks(boolean isBootstrapDuringInc) throws Exception { String dbName = tableContext.dbNameToLoadIn; //this can never be null or empty; // Create table associated with the import // Executed if relevant, and used to contain all the other details about the table if not. - ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName)); + ImportTableDesc tableDesc = event.tableDesc(dbName); Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); // Normally, on import, trying to create a table or a partition in a db that does not yet exist diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java index dbda41d827..16679db0c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java @@ -17,42 +17,18 @@ */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; -import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.ImportTableDesc; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; public class TableContext { final String dbNameToLoadIn; private final TaskTracker parentTracker; - // this will only be available when we are doing table load only in replication not otherwise - private final String tableNameToLoadIn; - public TableContext(TaskTracker parentTracker, String dbNameToLoadIn, - String tableNameToLoadIn) { + public TableContext(TaskTracker parentTracker, String dbNameToLoadIn) { this.dbNameToLoadIn = dbNameToLoadIn; this.parentTracker = parentTracker; - this.tableNameToLoadIn = tableNameToLoadIn; } boolean waitOnPrecursor() { return parentTracker.hasTasks(); } - - ImportTableDesc overrideProperties(ImportTableDesc importTableDesc) - throws SemanticException { - if (StringUtils.isNotBlank(tableNameToLoadIn)) { - importTableDesc.setTableName(tableNameToLoadIn); - - //For table level load, add this property to avoid duplicate copy. - // This flag will be set to false after first incremental load is done. This flag is used by - // repl copy task to check if duplicate file check is required or not. This flag is used by - // compaction to check if compaction can be done for this database or not. If compaction is - // done before first incremental then duplicate check will fail as compaction may change - // the directory structure. - importTableDesc.getTblProps().put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true"); - } - return importTableDesc; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 13de791fb3..2042798012 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -67,27 +66,25 @@ * Iterate through the dump directory and create tasks to load the events. */ public class IncrementalLoadTasksBuilder { - private final String dbName, tableName; + private final String dbName; private final IncrementalLoadEventsIterator iterator; private final HashSet inputs; private final HashSet outputs; private Logger log; private final HiveConf conf; private final ReplLogger replLogger; - private static long numIteration; + private static long numIteration = 0; private final Long eventTo; - public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath, + public IncrementalLoadTasksBuilder(String dbName, String loadPath, IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo) { this.dbName = dbName; - this.tableName = tableName; this.iterator = iterator; inputs = new HashSet<>(); outputs = new HashSet<>(); log = null; this.conf = conf; replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents()); - numIteration = 0; replLogger.startLog(); this.eventTo = eventTo; } @@ -106,14 +103,14 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP String location = dir.getPath().toUri().toString(); DumpMetaData eventDmd = new DumpMetaData(new Path(location), conf); - if (!shouldReplayEvent(dir, eventDmd.getDumpType(), dbName, tableName)) { - this.log.debug("Skipping event {} from {} for table {}.{} maxTasks: {}", - eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks()); + if (!shouldReplayEvent(dir, eventDmd.getDumpType(), dbName)) { + this.log.debug("Skipping event {} from {} for DB {} maxTasks: {}", + eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tracker.numberOfTasks()); continue; } - this.log.debug("Loading event {} from {} for table {}.{} maxTasks: {}", - eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks()); + this.log.debug("Loading event {} from {} for DB {} maxTasks: {}", + eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tracker.numberOfTasks()); // event loads will behave similar to table loads, with one crucial difference // precursor order is strict, and each event must be processed after the previous one. @@ -134,7 +131,7 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the // entire chain - MessageHandler.Context context = new MessageHandler.Context(dbName, tableName, location, + MessageHandler.Context context = new MessageHandler.Context(dbName, location, taskChainTail, eventDmd, conf, hive, driverContext.getCtx(), this.log); List> evTasks = analyzeEventLoad(context); @@ -157,17 +154,11 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP // if no events were replayed, then add a task to update the last repl id of the database/table to last event id. if (taskChainTail == evTaskRoot) { String lastEventid = eventTo.toString(); - if (StringUtils.isEmpty(tableName)) { - taskChainTail = dbUpdateReplStateTask(dbName, lastEventid, taskChainTail); - this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + lastEventid); - } else { - taskChainTail = tableUpdateReplStateTask(dbName, tableName, null, lastEventid, taskChainTail); - this.log.debug("no events to replay, set last repl id of table " + dbName + "." + tableName + " to " + - lastEventid); - } + taskChainTail = dbUpdateReplStateTask(dbName, lastEventid, taskChainTail); + this.log.debug("no events to replay, set last repl id of db " + dbName + " to " + lastEventid); } - ReplRemoveFirstIncLoadPendFlagDesc desc = new ReplRemoveFirstIncLoadPendFlagDesc(dbName, tableName); + ReplRemoveFirstIncLoadPendFlagDesc desc = new ReplRemoveFirstIncLoadPendFlagDesc(dbName); Task updateIncPendTask = TaskFactory.get(new DDLWork(inputs, outputs, desc), conf); taskChainTail.addDependentTask(updateIncPendTask); taskChainTail = updateIncPendTask; @@ -200,30 +191,19 @@ private boolean isEventNotReplayed(Map params, FileStatus dir, D return true; } - private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbName, String tableName) { - // if database itself is null then we can not filter out anything. - if (dbName == null || dbName.isEmpty()) { + private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbName) { + // If database itself is null then we can not filter out anything. + if (StringUtils.isBlank(dbName)) { + return true; + } + + try { + Database database = Hive.get().getDatabase(dbName); + return (database == null) || isEventNotReplayed(database.getParameters(), dir, dumpType); + } catch (HiveException e) { + // May be the db is getting created in this load + log.debug("Failed to get the database " + dbName); return true; - } else if ((tableName == null) || (tableName.isEmpty())) { - Database database; - try { - database = Hive.get().getDatabase(dbName); - return database == null ? true : isEventNotReplayed(database.getParameters(), dir, dumpType); - } catch (HiveException e) { - //may be the db is getting created in this load - log.debug("failed to get the database " + dbName); - return true; - } - } else { - Table tbl; - try { - tbl = Hive.get().getTable(dbName, tableName); - return isEventNotReplayed(tbl.getParameters(), dir, dumpType); - } catch (HiveException e) { - // may be the table is getting created in this load - log.debug("failed to get the table " + dbName + "." + tableName); - return true; - } } } @@ -241,7 +221,7 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa inputs.addAll(messageHandler.readEntities()); outputs.addAll(messageHandler.writeEntities()); - return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), messageHandler.getUpdatedMetadata(), tasks); + return addUpdateReplStateTasks(messageHandler.getUpdatedMetadata(), tasks); } private Task getMigrationCommitTxnTask(String dbName, String tableName, @@ -315,7 +295,6 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa } private List> addUpdateReplStateTasks( - boolean isDatabaseLoad, UpdatedMetaDataTracker updatedMetaDataTracker, List> importTasks) throws SemanticException { // If no import tasks generated by the event then no need to update the repl state to any object. @@ -347,7 +326,7 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa if (needCommitTx) { if (updateMetaData.getPartitionsList().size() > 0) { updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName, - updateMetaData.getPartitionsList(), replState, isDatabaseLoad, barrierTask); + updateMetaData.getPartitionsList(), replState, true, barrierTask); tasks.add(updateReplIdTask); // commit txn task will update repl id for table and database also. break; @@ -363,7 +342,7 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa if (tableName != null) { if (needCommitTx) { updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName, null, - replState, isDatabaseLoad, barrierTask); + replState, true, barrierTask); tasks.add(updateReplIdTask); // commit txn task will update repl id for database also. break; @@ -375,9 +354,9 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa // If any table/partition is updated, then update repl state in db object if (needCommitTx) { updateReplIdTask = getMigrationCommitTxnTask(dbName, null, null, - replState, isDatabaseLoad, barrierTask); + replState, true, barrierTask); tasks.add(updateReplIdTask); - } else if (isDatabaseLoad) { + } else { // For table level load, need not update replication state for the database updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); tasks.add(updateReplIdTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index f9f13e1a4c..2fcbe64aa7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -19,7 +19,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.ReplConst; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java index e04a0f3dce..26c7a606bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java @@ -430,13 +430,11 @@ public static String getLocalDirList(Configuration conf) { return null; } - public static String getReplPolicy(String dbName, String tableName) { + public static String getReplPolicy(String dbName) { if ((dbName == null) || (dbName.isEmpty())) { return "*.*"; - } else if ((tableName == null) || (tableName.isEmpty())) { - return dbName.toLowerCase() + ".*"; } else { - return dbName.toLowerCase() + "." + tableName.toLowerCase(); + return dbName.toLowerCase() + ".*"; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index bf9aa39307..cfdf180a9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -393,6 +393,8 @@ TOK_REPL_LOAD; TOK_REPL_STATUS; TOK_REPL_CONFIG; TOK_REPL_CONFIG_LIST; +TOK_REPL_TABLES; +TOK_REPL_TABLES_LIST; TOK_TO; TOK_ONLY; TOK_SUMMARY; @@ -891,23 +893,23 @@ replDumpStatement @init { pushMsg("replication dump statement", state); } @after { popMsg(state); } : KW_REPL KW_DUMP - (dbName=identifier) (DOT tblName=identifier)? + (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? (KW_FROM (eventId=Number) (KW_TO (rangeEnd=Number))? (KW_LIMIT (batchSize=Number))? )? (KW_WITH replConf=replConfigs)? - -> ^(TOK_REPL_DUMP $dbName ^(TOK_TABNAME $tblName)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) + -> ^(TOK_REPL_DUMP $dbName $tablePolicy? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) ; replLoadStatement @init { pushMsg("replication load statement", state); } @after { popMsg(state); } : KW_REPL KW_LOAD - ((dbName=identifier) (DOT tblName=identifier)?)? + (dbName=identifier)? KW_FROM (path=StringLiteral) (KW_WITH replConf=replConfigs)? - -> ^(TOK_REPL_LOAD $path ^(TOK_DBNAME $dbName)? ^(TOK_TABNAME $tblName)? $replConf?) + -> ^(TOK_REPL_LOAD $path ^(TOK_DBNAME $dbName)? $replConf?) ; replConfigs @@ -924,13 +926,37 @@ replConfigsList keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_REPL_CONFIG_LIST keyValueProperty+) ; +replTableLevelPolicy +@init { pushMsg("replication table level policy definition", state); } +@after { popMsg(state); } + : + ((replTablesIncludeList=replTablesList) (DOT replTablesExcludeList=replTablesList)?) + -> ^(TOK_REPL_TABLES $replTablesIncludeList $replTablesExcludeList?) + ; + +replTablesList +@init { pushMsg("replication table name or comma separated table names pattern list", state); } +@after { popMsg(state); } + : + (LSQUARE (tablePattern (COMMA tablePattern)*)? RSQUARE) -> ^(TOK_REPL_TABLES_LIST tablePattern*) + ; + +tablePattern +@init { pushMsg("Table name pattern", state); } +@after { popMsg(state); } + : + (pattern=StringLiteral) -> $pattern + | + (identifier) -> TOK_NULL + ; + replStatusStatement @init { pushMsg("replication status statement", state); } @after { popMsg(state); } : KW_REPL KW_STATUS - (dbName=identifier) (DOT tblName=identifier)? + (dbName=identifier) (KW_WITH replConf=replConfigs)? - -> ^(TOK_REPL_STATUS $dbName ^(TOK_TABNAME $tblName)? $replConf?) + -> ^(TOK_REPL_STATUS $dbName $replConf?) ; ddlStatement diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index b4b849c4ec..ea4e331490 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.Warehouse; @@ -36,7 +37,6 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -56,22 +56,25 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_NULL; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; -import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABNAME; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_TABLES; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_TABLES_LIST; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { - // Database name or pattern - private String dbNameOrPattern; - // Table name or pattern - private String tblNameOrPattern; + // Replication Scope + private ReplScope replScope = new ReplScope(); + private Long eventFrom; private Long eventTo; private Integer maxEventLimit; + // Base path for REPL LOAD private String path; // Added conf member to set the REPL command specific config entries without affecting the configs @@ -101,23 +104,16 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { switch (ast.getToken().getType()) { case TOK_REPL_DUMP: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump"); - try { - initReplDump(ast); - } catch (HiveException e) { - throw new SemanticException(e.getMessage(), e); - } analyzeReplDump(ast); break; } case TOK_REPL_LOAD: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load"); - initReplLoad(ast); analyzeReplLoad(ast); break; } case TOK_REPL_STATUS: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status"); - initReplStatus(ast); analyzeReplStatus(ast); break; } @@ -134,50 +130,93 @@ private void setTxnConfigs() { } } + private void setReplDumpTablesList(Tree replTablesNode) throws HiveException { + assert(replTablesNode.getChildCount() <= 2); + + // Traverse the children which can be either just include tables list or both include + // and exclude tables lists. + for (int listIdx = 0; listIdx < replTablesNode.getChildCount(); listIdx++) { + Tree tablesListNode = replTablesNode.getChild(listIdx); + assert(tablesListNode.getType() == TOK_REPL_TABLES_LIST); + + List tablesList = new ArrayList<>(); + for (int child = 0; child < tablesListNode.getChildCount(); child++) { + Tree tablePatternNode = tablesListNode.getChild(child); + if (tablePatternNode.getType() == TOK_NULL) { + LOG.error(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getMsg()); + throw new SemanticException(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getMsg()); + } + tablesList.add(unescapeSQLString(tablePatternNode.getText())); + } + + if (listIdx == 0) { + LOG.info("ReplScope: Set Included Tables List: {}", tablesList); + replScope.setIncludedTablePatterns(tablesList); + } else { + LOG.info("ReplScope: Set Excluded Tables List: {}", tablesList); + replScope.setExcludedTablePatterns(tablesList); + } + } + } + private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); boolean isMetaDataOnly = false; - dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); - - // skip the first node, which is always required - int currNode = 1; - while (currNode < numChildren) { - if (ast.getChild(currNode).getType() == TOK_REPL_CONFIG) { - Map replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) ast.getChild(currNode).getChild(0)); - if (null != replConfigs) { - for (Map.Entry config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); + + String dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + LOG.info("ReplScope: Set DB Name: {}", dbNameOrPattern); + replScope.setDbName(dbNameOrPattern); + + // Skip the first node, which is always required + int childIdx = 1; + while (childIdx < numChildren) { + Tree currNode = ast.getChild(childIdx); + switch (currNode.getType()) { + case TOK_REPL_CONFIG: { + Map replConfigs + = DDLSemanticAnalyzer.getProps((ASTNode) currNode.getChild(0)); + if (null != replConfigs) { + for (Map.Entry config : replConfigs.entrySet()) { + conf.set(config.getKey(), config.getValue()); + } + isMetaDataOnly = HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY); } - isMetaDataOnly = HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY); + break; } - } else if (ast.getChild(currNode).getType() == TOK_TABNAME) { - // optional tblName was specified. - tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getChild(0).getText()); - } else { - // TOK_FROM subtree - Tree fromNode = ast.getChild(currNode); - eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); - // skip the first, which is always required - int numChild = 1; - while (numChild < fromNode.getChildCount()) { - if (fromNode.getChild(numChild).getType() == TOK_TO) { - eventTo = - Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); - // skip the next child, since we already took care of it - numChild++; - } else if (fromNode.getChild(numChild).getType() == TOK_LIMIT) { - maxEventLimit = - Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); - // skip the next child, since we already took care of it - numChild++; + case TOK_REPL_TABLES: { + setReplDumpTablesList(currNode); + break; + } + case TOK_FROM: { + // TOK_FROM subtree + Tree fromNode = currNode; + eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); + + // Skip the first, which is always required + int fromChildIdx = 1; + while (fromChildIdx < fromNode.getChildCount()) { + if (fromNode.getChild(fromChildIdx).getType() == TOK_TO) { + eventTo = + Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(fromChildIdx + 1).getText())); + // Skip the next child, since we already took care of it + fromChildIdx++; + } else if (fromNode.getChild(fromChildIdx).getType() == TOK_LIMIT) { + maxEventLimit = + Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(fromChildIdx + 1).getText())); + // Skip the next child, since we already took care of it + fromChildIdx++; + } + // move to the next child in FROM tree + fromChildIdx++; } - // move to the next child in FROM tree - numChild++; + break; + } + default: { + throw new SemanticException("Unrecognized token in REPL DUMP statement."); } } - // move to the next root node - currNode++; + // Move to the next root node + childIdx++; } for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { @@ -196,15 +235,17 @@ private void initReplDump(ASTNode ast) throws HiveException { // REPL DUMP private void analyzeReplDump(ASTNode ast) throws SemanticException { - LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern) - + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to " - + String.valueOf(eventTo) + " maxEventLimit " + String.valueOf(maxEventLimit)); + try { + initReplDump(ast); + } catch (HiveException e) { + throw new SemanticException(e.getMessage(), e); + } + try { ctx.setResFile(ctx.getLocalTmpPath()); Task replDumpWorkTask = TaskFactory .get(new ReplDumpWork( - dbNameOrPattern, - tblNameOrPattern, + replScope, eventFrom, eventTo, ErrorMsg.INVALID_PATH.getMsg(ast), @@ -212,15 +253,13 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { ctx.getResFile().toUri().toString() ), conf); rootTasks.add(replDumpWorkTask); - if (dbNameOrPattern != null) { - for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { - if (tblNameOrPattern != null) { - for (String tblName : Utils.matchesTbl(db, dbName, tblNameOrPattern)) { - inputs.add(new ReadEntity(db.getTable(dbName, tblName))); - } - } else { - inputs.add(new ReadEntity(db.getDatabase(dbName))); + for (String dbName : Utils.matchesDb(db, replScope.getDbName())) { + if (!replScope.includeAllTables()) { + for (String tblName : Utils.matchesTbl(db, dbName, replScope)) { + inputs.add(new ReadEntity(db.getTable(dbName, tblName))); } + } else { + inputs.add(new ReadEntity(db.getDatabase(dbName))); } } setFetchTask(createFetchTask(dumpSchema)); @@ -262,16 +301,13 @@ private void initReplLoad(ASTNode ast) throws SemanticException { ASTNode childNode = (ASTNode) ast.getChild(i); switch (childNode.getToken().getType()) { case TOK_DBNAME: - dbNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); - break; - case TOK_TABNAME: - tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + replScope.setDbName(PlanUtils.stripQuotes(childNode.getChild(0).getText())); break; case TOK_REPL_CONFIG: setConfigs((ASTNode) childNode.getChild(0)); break; default: - throw new SemanticException("Unrecognized token in REPL LOAD statement"); + throw new SemanticException("Unrecognized token in REPL LOAD statement."); } } } @@ -319,13 +355,11 @@ private void initReplLoad(ASTNode ast) throws SemanticException { * 36/ */ private void analyzeReplLoad(ASTNode ast) throws SemanticException { - LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "." - + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path)); + initReplLoad(ast); - // for analyze repl load, we walk through the dir structure available in the path, + // For analyze repl load, we walk through the dir structure available in the path, // looking at each db, and then each table, and then setting up the appropriate // import job in its place. - try { assert(path != null); Path loadPath = new Path(path); @@ -377,8 +411,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } else { LOG.debug("{} contains an bootstrap dump", loadPath); } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo(), + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(), + queryState.getLineageState(), evDump, dmd.getEventTo(), dirLocationsToCopy(loadPath, evDump)); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } catch (Exception e) { @@ -431,49 +465,32 @@ private void setConfigs(ASTNode node) throws SemanticException { // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ - dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + replScope.setDbName(PlanUtils.stripQuotes(ast.getChild(0).getText())); int numChildren = ast.getChildCount(); for (int i = 1; i < numChildren; i++) { ASTNode childNode = (ASTNode) ast.getChild(i); - switch (childNode.getToken().getType()) { - case TOK_TABNAME: - tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); - break; - case TOK_REPL_CONFIG: + if (childNode.getToken().getType() == TOK_REPL_CONFIG) { setConfigs((ASTNode) childNode.getChild(0)); - break; - default: - throw new SemanticException("Unrecognized token in REPL STATUS statement"); + } else { + throw new SemanticException("Unrecognized token in REPL STATUS statement."); } } } private void analyzeReplStatus(ASTNode ast) throws SemanticException { - LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern) - + "." + String.valueOf(tblNameOrPattern)); + initReplStatus(ast); + String dbNameOrPattern = replScope.getDbName(); String replLastId = null; try { - if (tblNameOrPattern != null) { - // Checking for status of table - Table tbl = db.getTable(dbNameOrPattern, tblNameOrPattern); - if (tbl != null) { - inputs.add(new ReadEntity(tbl)); - Map params = tbl.getParameters(); - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { - replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - } - } - } else { - // Checking for status of a db - Database database = db.getDatabase(dbNameOrPattern); - if (database != null) { - inputs.add(new ReadEntity(database)); - Map params = database.getParameters(); - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { - replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - } + // Checking for status of a db + Database database = db.getDatabase(dbNameOrPattern); + if (database != null) { + inputs.add(new ReadEntity(database)); + Map params = database.getParameters(); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { + replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); } } } catch (HiveException e) { @@ -484,7 +501,7 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string"); setFetchTask(createFetchTask("last_repl_id#string")); LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}", - String.valueOf(replLastId), ctx.getResFile(), conf); + replLastId, ctx.getResFile(), conf); } private void prepareReturnValues(List values, String schema) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index b09ec25e4b..ce5ef06830 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -88,18 +89,24 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi public static Iterable matchesTbl(Hive db, String dbName, String tblPattern) throws HiveException { if (tblPattern == null) { - return getAllTables(db, dbName); + return getAllTables(db, dbName, null); } else { return db.getTablesByPattern(dbName, tblPattern); } } - public static Collection getAllTables(Hive db, String dbName) throws HiveException { + public static Iterable matchesTbl(Hive db, String dbName, ReplScope replScope) + throws HiveException { + return getAllTables(db, dbName, replScope); + } + + public static Collection getAllTables(Hive db, String dbName, ReplScope replScope) throws HiveException { return Collections2.filter(db.getAllTables(dbName), tableName -> { - assert tableName != null; + assert(tableName != null); return !tableName.toLowerCase().startsWith( - SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()); + SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()) + && ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); }); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 5c2b3d1883..995d634ed9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -18,10 +18,12 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; @@ -101,6 +103,20 @@ private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hiv createDumpFile(context, qlMdTable, qlPtns, fileListArray); } + private List getAllWriteEventInfo(Context withinContext) throws Exception { + String contextDbName = StringUtils.normalizeIdentifier(withinContext.replScope.getDbName()); + RawStore rawStore = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf); + List writeEventInfoList + = rawStore.getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, null); + return ((writeEventInfoList == null) + ? null + : new ArrayList<>(Collections2.filter(writeEventInfoList, + writeEventInfo -> { + assert(writeEventInfo != null); + return withinContext.replScope.tableIncludedInReplScope(writeEventInfo.getTable()); + }))); + } + @Override public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON); @@ -125,15 +141,12 @@ public void handle(Context withinContext) throws Exception { "not dumping acid tables."); replicatingAcidEvents = false; } - String contextDbName = withinContext.dbName == null ? null : - StringUtils.normalizeIdentifier(withinContext.dbName); - String contextTableName = withinContext.tableName == null ? null : - StringUtils.normalizeIdentifier(withinContext.tableName); + List writeEventInfoList = null; if (replicatingAcidEvents) { - writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf). - getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName); + writeEventInfoList = getAllWriteEventInfo(withinContext); } + int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0); if (numEntry != 0) { eventMessage.addWriteEventInfo(writeEventInfoList); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index ec35f4e94d..0d60c3136d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; - import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -40,18 +40,16 @@ final Hive db; final HiveConf hiveConf; final ReplicationSpec replicationSpec; - final String dbName; - final String tableName; + final ReplScope replScope; public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, - ReplicationSpec replicationSpec, String dbName, String tableName) { + ReplicationSpec replicationSpec, ReplScope replScope) { this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; this.hiveConf = hiveConf; this.replicationSpec = replicationSpec; - this.dbName = dbName; - this.tableName = tableName; + this.replScope = replScope; } public Context(Context other) { @@ -60,11 +58,10 @@ public Context(Context other) { this.db = other.db; this.hiveConf = other.hiveConf; this.replicationSpec = other.replicationSpec; - this.dbName = other.dbName; - this.tableName = other.tableName; + this.replScope = other.replScope; } - public void setEventRoot(Path eventRoot) { + void setEventRoot(Path eventRoot) { this.eventRoot = eventRoot; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java index d3f3306366..599503a884 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java @@ -44,7 +44,7 @@ AbortTxnMessage msg = deserializer.getAbortTxnMessage(context.dmd.getPayload()); Task abortTxnTask = TaskFactory.get( - new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName, + new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName, null, msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, context.eventOnlyReplicationSpec()), context.hiveConf ); @@ -52,7 +52,7 @@ // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database. // Anyways, if this event gets executed again, it is taken care of. if (!context.isDbNameEmpty()) { - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, null, null); } context.log.debug("Added Abort txn task : {}", abortTxnTask.getId()); return Collections.singletonList(abortTxnTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java index b2e90fe752..660f110d20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddForeignKeyHandler.java @@ -37,7 +37,7 @@ throws SemanticException { AddForeignKeyMessage msg = deserializer.getAddForeignKeyMessage(context.dmd.getPayload()); - List fks = null; + List fks; try { fks = msg.getForeignKeys(); } catch (Exception e) { @@ -54,7 +54,7 @@ } String actualDbName = context.isDbNameEmpty() ? fks.get(0).getFktable_db() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? fks.get(0).getFktable_name() : context.tableName; + String actualTblName = fks.get(0).getFktable_name(); for (SQLForeignKey fk : fks) { // If parent table is in the same database, change it to the actual db on destination diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java index 4273e445fb..db18e37fd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddNotNullConstraintHandler.java @@ -37,7 +37,7 @@ throws SemanticException { AddNotNullConstraintMessage msg = deserializer.getAddNotNullConstraintMessage(context.dmd.getPayload()); - List nns = null; + List nns; try { nns = msg.getNotNullConstraints(); } catch (Exception e) { @@ -54,7 +54,7 @@ } String actualDbName = context.isDbNameEmpty() ? nns.get(0).getTable_db() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? nns.get(0).getTable_name() : context.tableName; + String actualTblName = nns.get(0).getTable_name(); for (SQLNotNullConstraint nn : nns) { nn.setTable_db(actualDbName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java index 6cb4722dd4..ea7f1dc667 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddPrimaryKeyHandler.java @@ -37,7 +37,7 @@ throws SemanticException { AddPrimaryKeyMessage msg = deserializer.getAddPrimaryKeyMessage(context.dmd.getPayload()); - List pks = null; + List pks; try { pks = msg.getPrimaryKeys(); } catch (Exception e) { @@ -54,7 +54,7 @@ } String actualDbName = context.isDbNameEmpty() ? pks.get(0).getTable_db() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? pks.get(0).getTable_name() : context.tableName; + String actualTblName = pks.get(0).getTable_name(); for (SQLPrimaryKey pk : pks) { pk.setTable_db(actualDbName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java index 9b010d7c74..e0b708bb93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AddUniqueConstraintHandler.java @@ -37,7 +37,7 @@ throws SemanticException { AddUniqueConstraintMessage msg = deserializer.getAddUniqueConstraintMessage(context.dmd.getPayload()); - List uks = null; + List uks; try { uks = msg.getUniqueConstraints(); } catch (Exception e) { @@ -54,7 +54,7 @@ } String actualDbName = context.isDbNameEmpty() ? uks.get(0).getTable_db() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? uks.get(0).getTable_name() : context.tableName; + String actualTblName = uks.get(0).getTable_name(); for (SQLUniqueConstraint uk : uks) { uk.setTable_db(actualDbName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java index 63f2577bb5..bb4402f556 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java @@ -46,13 +46,11 @@ String dbName = (context.dbName != null && !context.dbName.isEmpty() ? context.dbName : msg.getDB()); - // The context table name can be null if repl load is done on a full db. - // But we need table name for alloc write id and that is received from source. - String tableName = (context.tableName != null && !context.tableName.isEmpty() ? context.tableName : msg - .getTableName()); + // We need table name for alloc write id and that is received from source. + String tableName = msg.getTableName(); // Repl policy should be created based on the table name in context. - ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), dbName, tableName, + ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), dbName, tableName, ReplTxnWork.OperationType.REPL_ALLOC_WRITE_ID, msg.getTxnToWriteIdList(), context.eventOnlyReplicationSpec()); Task allocWriteIdTask = TaskFactory.get(work, context.hiveConf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java index edeaaa26e4..5a527ae0de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java @@ -44,12 +44,6 @@ @Override public List> handle(Context context) throws SemanticException { - - if (!context.isTableNameEmpty()) { - throw new SemanticException( - "Alter Database are not supported for table-level replication"); - } - AlterDatabaseMessage msg = deserializer.getAlterDatabaseMessage(context.dmd.getPayload()); String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java index 9f0f705cd3..52098fb04a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java @@ -52,10 +52,10 @@ List> tasks = new ArrayList<>(); String dbName = context.dbName; String tableNamePrev = null; - String tblName = context.tableName; + String tblName = null; - ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, - context.tableName, msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec()); + ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName, + null, msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec()); if (numEntry > 0) { context.log.debug("Commit txn handler for txnid " + msg.getTxnId() + " databases : " + msg.getDatabases() + @@ -73,10 +73,10 @@ if (tableNamePrev == null || !(completeName.equals(tableNamePrev))) { // The data location is created by source, so the location should be formed based on the table name in msg. Path location = HiveUtils.getDumpPath(new Path(context.location), actualDBName, actualTblName); - tblName = context.isTableNameEmpty() ? actualTblName : context.tableName; + tblName = actualTblName; // for warehouse level dump, use db name from write event dbName = (context.isDbNameEmpty() ? actualDBName : context.dbName); - Context currentContext = new Context(context, dbName, tblName); + Context currentContext = new Context(context, dbName); currentContext.setLocation(location.toUri().toString()); // Piggybacking in Import logic for now @@ -105,7 +105,7 @@ // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database. // Anyways, if this event gets executed again, it is taken care of. if (!context.isDbNameEmpty()) { - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, null, null); } context.log.debug("Added Commit txn task : {}", commitTxnTask.getId()); if (tasks.isEmpty()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeletePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeletePartColStatHandler.java index 095d377ac6..f6153a64b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeletePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeletePartColStatHandler.java @@ -36,7 +36,7 @@ throws SemanticException { context.log.info("Replication of partition stat delete event is not supported yet"); if (!context.isDbNameEmpty()) { - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, null, null); } return Collections.singletonList(TaskFactory.get(new DependencyCollectionWork(), context.hiveConf)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeleteTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeleteTableColStatHandler.java index 488f89dc49..404372a613 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeleteTableColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DeleteTableColStatHandler.java @@ -36,7 +36,7 @@ throws SemanticException { context.log.info("Replication of table stat delete event is not supported yet"); if (!context.isDbNameEmpty()) { - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, null, null); } return Collections.singletonList(TaskFactory.get(new DependencyCollectionWork(), context.hiveConf)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java index def207eec0..abf05bfd87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java @@ -34,7 +34,7 @@ throws SemanticException { DropConstraintMessage msg = deserializer.getDropConstraintMessage(context.dmd.getPayload()); String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + String actualTblName = msg.getTable(); String constraintName = msg.getConstraint(); AlterTableDropConstraintDesc dropConstraintsDesc = new AlterTableDropConstraintDesc( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 98a4b71781..d48cf7afb7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -39,7 +39,7 @@ try { DropPartitionMessage msg = deserializer.getDropPartitionMessage(context.dmd.getPayload()); String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + String actualTblName = msg.getTable(); Map> partSpecs = ReplUtils.genPartSpecs(new Table(msg.getTableObj()), msg.getPartitions()); if (partSpecs.size() > 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index b1f820ce40..f66a40833f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -34,7 +34,7 @@ throws SemanticException { DropTableMessage msg = deserializer.getDropTableMessage(context.dmd.getPayload()); String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + String actualTblName = msg.getTable(); DropTableDesc dropTableDesc = new DropTableDesc( actualDbName + "." + actualTblName, null, true, true, context.eventOnlyReplicationSpec(), false diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java index fe89ab2f38..1eeacbf430 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java @@ -55,10 +55,8 @@ InsertMessage insertMessage = deserializer.getInsertMessage(withinContext.dmd.getPayload()); String actualDbName = withinContext.isDbNameEmpty() ? insertMessage.getDB() : withinContext.dbName; - String actualTblName = - withinContext.isTableNameEmpty() ? insertMessage.getTable() : withinContext.tableName; + Context currentContext = new Context(withinContext, actualDbName); - Context currentContext = new Context(withinContext, actualDbName, actualTblName); // Piggybacking in Import logic for now TableHandler tableHandler = new TableHandler(); List> tasks = tableHandler.handle(currentContext); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java index 4ae4894797..ad3be67ee6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -47,7 +47,7 @@ class Context { public String location; - public final String tableName, dbName; + public final String dbName; public final Task precursor; public DumpMetaData dmd; final HiveConf hiveConf; @@ -55,11 +55,10 @@ final org.apache.hadoop.hive.ql.Context nestedContext; final Logger log; - public Context(String dbName, String tableName, String location, + public Context(String dbName, String location, Task precursor, DumpMetaData dmd, HiveConf hiveConf, Hive db, org.apache.hadoop.hive.ql.Context nestedContext, Logger log) { this.dbName = dbName; - this.tableName = tableName; this.location = location; this.precursor = precursor; this.dmd = dmd; @@ -69,9 +68,8 @@ public Context(String dbName, String tableName, String location, this.log = log; } - public Context(Context other, String dbName, String tableName) { + public Context(Context other, String dbName) { this.dbName = dbName; - this.tableName = tableName; this.location = other.location; this.precursor = other.precursor; this.dmd = other.dmd; @@ -81,10 +79,6 @@ public Context(Context other, String dbName, String tableName) { this.log = other.log; } - boolean isTableNameEmpty() { - return StringUtils.isEmpty(tableName); - } - public boolean isDbNameEmpty() { return StringUtils.isEmpty(dbName); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java index 5dcc44e56b..6123371679 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java @@ -43,7 +43,7 @@ OpenTxnMessage msg = deserializer.getOpenTxnMessage(context.dmd.getPayload()); Task openTxnTask = TaskFactory.get( - new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName, + new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName, null, msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN, context.eventOnlyReplicationSpec()), context.hiveConf ); @@ -51,7 +51,7 @@ // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database. // Anyways, if this event gets executed again, it is taken care of. if (!context.isDbNameEmpty()) { - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, null, null); } context.log.debug("Added Open txn task : {}", openTxnTask.getId()); return Collections.singletonList(openTxnTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 2c30641254..c380c0ddf2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -41,7 +41,7 @@ AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + String actualTblName = msg.getTable(); Map newPartSpec = new LinkedHashMap<>(); Map oldPartSpec = new LinkedHashMap<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 53d998200c..54550421fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -35,12 +35,7 @@ @Override public List> handle(Context context) throws SemanticException { - AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload()); - if (!context.isTableNameEmpty()) { - throw new SemanticException( - "RENAMES of tables are not supported for table-level replication"); - } try { Table tableObjBefore = msg.getTableObjBefore(); Table tableObjAfter = msg.getTableObjAfter(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 56c2abeccc..664015f27b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -79,7 +79,7 @@ // REPL LOAD is not partition level. It is always DB or table level. So, passing null for partition specs. // Also, REPL LOAD doesn't support external table and hence no location set as well. ImportSemanticAnalyzer.prepareImport(false, isLocationSet, isExternal, false, - (context.precursor != null), parsedLocation, context.tableName, context.dbName, + (context.precursor != null), parsedLocation, null, context.dbName, null, context.location, x, updatedMetadata, context.getTxnMgr(), tuple.writeId); Task openTxnTask = x.getOpenTxnTask(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index 43a976cacd..8d5a50ae81 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -37,7 +37,7 @@ public List> handle(Context context) throws SemanticException { AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + String actualTblName = msg.getTable(); Map partSpec = new LinkedHashMap<>(); org.apache.hadoop.hive.metastore.api.Table tblObj; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index 18531266d3..8e3186f738 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -33,7 +33,7 @@ public List> handle(Context context) throws SemanticException { AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload()); String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; - String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; + String actualTblName = msg.getTable(); TruncateTableDesc truncateTableDesc = new TruncateTableDesc( actualDbName + "." + actualTblName, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java index cb85f7db45..bea431c907 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java @@ -41,12 +41,9 @@ // Update tablename and database name in the statistics object ColumnStatistics colStats = upcsm.getColumnStatistics(); ColumnStatisticsDesc colStatsDesc = colStats.getStatsDesc(); - if (!context.isTableNameEmpty()) { - colStatsDesc.setTableName(context.tableName); - } if (!context.isDbNameEmpty()) { colStatsDesc.setDbName(context.dbName); - updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, colStatsDesc.getTableName(), null); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java index 371429e645..6160d438e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java @@ -42,12 +42,9 @@ ColumnStatistics colStats = utcsm.getColumnStatistics(); ColumnStatisticsDesc colStatsDesc = colStats.getStatsDesc(); colStatsDesc.setDbName(context.dbName); - if (!context.isTableNameEmpty()) { - colStatsDesc.setTableName(context.tableName); - } if (!context.isDbNameEmpty()) { updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, - context.tableName, null); + colStatsDesc.getTableName(), null); } try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplRemoveFirstIncLoadPendFlagDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplRemoveFirstIncLoadPendFlagDesc.java index 23d5825e92..afa0a09af2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplRemoveFirstIncLoadPendFlagDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplRemoveFirstIncLoadPendFlagDesc.java @@ -31,7 +31,6 @@ private static final long serialVersionUID = 1L; String databaseName; String tableName; - boolean incLoadPendingFlag; /** * For serialization only. @@ -39,10 +38,9 @@ public ReplRemoveFirstIncLoadPendFlagDesc() { } - public ReplRemoveFirstIncLoadPendFlagDesc(String databaseName, String tableName) { + public ReplRemoveFirstIncLoadPendFlagDesc(String databaseName) { super(); this.databaseName = databaseName; - this.tableName = tableName; } @Explain(displayName="db_name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -53,13 +51,4 @@ public String getDatabaseName() { public void setDatabaseName(String databaseName) { this.databaseName = databaseName; } - - @Explain(displayName="table_name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 7a58dba877..a412d435fd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -39,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; @@ -103,13 +103,14 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiv public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throws Exception { List tableList = Arrays.asList("a1", "a2"); String dbRandomKey = "akeytoberandom"; + ReplScope replScope = new ReplScope("default"); mockStatic(Utils.class); when(Utils.matchesDb(same(hive), eq("default"))) .thenReturn(Collections.singletonList("default")); - when(Utils.getAllTables(same(hive), eq("default"))).thenReturn(tableList); + when(Utils.getAllTables(same(hive), eq("default"), eq(replScope))).thenReturn(tableList); when(Utils.setDbBootstrapDumpState(same(hive), eq("default"))).thenReturn(dbRandomKey); - when(Utils.matchesTbl(same(hive), eq("default"), anyString())).thenReturn(tableList); + when(Utils.matchesTbl(same(hive), eq("default"), eq(replScope))).thenReturn(tableList); when(hive.getAllFunctions()).thenReturn(Collections.emptyList()); @@ -138,7 +139,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, task.initialize(queryState, null, null, null); task.setWork( - new ReplDumpWork("default", "", + new ReplDumpWork(replScope, Long.MAX_VALUE, Long.MAX_VALUE, "", Integer.MAX_VALUE, "") ); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java index a034723faf..9db4e9612d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java @@ -75,9 +75,11 @@ private static ASTNode assertFromEvent(final int expectedNumberOfChildren, ASTNo private static void assertTableName(ASTNode root) { ASTNode child = (ASTNode) root.getChild(1); - assertEquals("TOK_TABNAME", child.getText()); + assertEquals("TOK_REPL_TABLES", child.getText()); assertEquals(1, child.getChildCount()); - assertEquals("test_table", child.getChild(0).getText()); + assertEquals("TOK_REPL_TABLES_LIST", child.getChild(0).getText()); + assertEquals(1, child.getChild(0).getChildCount()); + assertEquals("'test_table'", child.getChild(0).getChild(0).getText()); } private static void assertDatabase(final int expectedNumberOfChildren, ASTNode root) { @@ -108,14 +110,14 @@ public void parseDb() throws ParseException { @Test public void parseTableName() throws ParseException { - ASTNode root = parse("repl dump testDb.test_table"); + ASTNode root = parse("repl dump testDb.['test_table']"); assertDatabase(2, root); assertTableName(root); } @Test public void parseFromEventId() throws ParseException { - ASTNode root = parse("repl dump testDb.test_table from 100"); + ASTNode root = parse("repl dump testDb.['test_table'] from 100"); assertDatabase(3, root); assertTableName(root); assertFromEvent(1, root); @@ -123,7 +125,7 @@ public void parseFromEventId() throws ParseException { @Test public void parseToEventId() throws ParseException { - ASTNode root = parse("repl dump testDb.test_table from 100 to 200"); + ASTNode root = parse("repl dump testDb.['test_table'] from 100 to 200"); assertDatabase(3, root); assertTableName(root); ASTNode fromClauseRootNode = assertFromEvent(3, root); @@ -132,7 +134,7 @@ public void parseToEventId() throws ParseException { @Test public void parseLimit() throws ParseException { - ASTNode root = parse("repl dump testDb.test_table from 100 to 200 limit 10"); + ASTNode root = parse("repl dump testDb.['test_table'] from 100 to 200 limit 10"); assertDatabase(3, root); assertTableName(root); ASTNode fromClauseRootNode = assertFromEvent(5, root); @@ -160,7 +162,7 @@ public void parseDb() throws ParseException { @Test public void parseTableName() throws ParseException { ASTNode root = - parse("repl dump testDb.test_table with ('key.1'='value.1','key.2'='value.2')"); + parse("repl dump testDb.['test_table'] with ('key.1'='value.1','key.2'='value.2')"); assertDatabase(3, root); assertTableName(root); assertWithClause(root, 2); @@ -168,7 +170,7 @@ public void parseTableName() throws ParseException { @Test public void parseFromEventId() throws ParseException { - ASTNode root = parse("repl dump testDb.test_table from 100 " + ASTNode root = parse("repl dump testDb.['test_table'] from 100 " + "with ('key.1'='value.1','key.2'='value.2')"); assertDatabase(4, root); assertTableName(root); @@ -178,7 +180,7 @@ public void parseFromEventId() throws ParseException { @Test public void parseToEventId() throws ParseException { - ASTNode root = parse("repl dump testDb.test_table from 100 to 200 " + ASTNode root = parse("repl dump testDb.['test_table'] from 100 to 200 " + "with ('key.1'='value.1','key.2'='value.2')"); assertDatabase(4, root); assertTableName(root); @@ -189,7 +191,7 @@ public void parseToEventId() throws ParseException { @Test public void parseLimit() throws ParseException { - ASTNode root = parse("repl dump testDb.test_table from 100 to 200 limit 10 " + ASTNode root = parse("repl dump testDb.['test_table'] from 100 to 200 limit 10 " + "with ('key.1'='value.1','key.2'='value.2')"); assertDatabase(4, root); assertTableName(root); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java index 7b6c3e7507..3c7ef1dee8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java @@ -73,7 +73,7 @@ public void setup() { MetaData metadata = new MetaData(null, null, null, null, functionObj); Context context = - new Context("primaryDb", null, null, null, null, hiveConf, null, null, logger); + new Context("primaryDb", null, null, null, hiveConf, null, null, logger); when(hiveConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR)) .thenReturn("/someBasePath/withADir/"); function = new PrimaryToReplicaResourceFunction(context, metadata, "replicaDbName"); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java similarity index 96% rename from standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java index e25189d35d..c2c8e4b20f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.common; +package org.apache.hadoop.hive.common.repl; /** * A class that defines the constant strings used by the replication implementation. diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java new file mode 100644 index 0000000000..b8c95d5e9b --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.repl; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Class that stores the replication scope. Replication scope includes the details of database and + * tables included under the scope of replication. + */ +public class ReplScope implements Serializable { + private String dbName; + private Pattern dbNamePattern; + private List includedTableNamePatterns; // Only for REPL DUMP and exist only if tableName == null. + private List excludedTableNamePatterns; // Only for REPL DUMP and exist only if tableName == null. + + public ReplScope() { + } + + public ReplScope(String dbName) { + setDbName(dbName); + } + + public void setDbName(String dbName) { + this.dbName = dbName; + this.dbNamePattern = (((dbName == null) || "*".equals(dbName)) + ? null : Pattern.compile(dbName, Pattern.CASE_INSENSITIVE)); + } + + public String getDbName() { + return dbName; + } + + public void setIncludedTablePatterns(List includedTableNamePatterns) { + this.includedTableNamePatterns = compilePatterns(includedTableNamePatterns); + } + + public void setExcludedTablePatterns(List excludedTableNamePatterns) { + this.excludedTableNamePatterns = compilePatterns(excludedTableNamePatterns); + } + + public boolean includeAllTables() { + return ((includedTableNamePatterns == null) + && (excludedTableNamePatterns == null)); + } + + public boolean includedInReplScope(final String dbName, final String tableName) { + return dbIncludedInReplScope(dbName) && tableIncludedInReplScope(tableName); + } + + public boolean dbIncludedInReplScope(final String dbName) { + return (dbNamePattern == null) || dbNamePattern.matcher(dbName).matches(); + } + + public boolean tableIncludedInReplScope(final String tableName) { + if (tableName == null) { + // If input tableName is empty, it means, DB level event. It should be always included as + // this is DB level replication with list of included/excluded tables. + return true; + } + return (inTableIncludedList(tableName) && !inTableExcludedList(tableName)); + } + + private List compilePatterns(List patterns) { + if (patterns == null || patterns.isEmpty()) { + return null; + } + List compiledPatterns = new ArrayList<>(); + for (String pattern : patterns) { + // Convert the pattern to lower case because events/HMS will have table names in lower case. + compiledPatterns.add(Pattern.compile(pattern, Pattern.CASE_INSENSITIVE)); + } + return compiledPatterns; + } + + private boolean tableMatchAnyPattern(final String tableName, List tableNamePatterns) { + assert(tableNamePatterns != null); + for (Pattern tableNamePattern : tableNamePatterns) { + if (tableNamePattern.matcher(tableName).matches()) { + return true; + } + } + return false; + } + + private boolean inTableIncludedList(final String tableName) { + if (includedTableNamePatterns == null) { + // If included list is empty, repl policy should be full db replication. + // So, all tables must be included. + return true; + } + return tableMatchAnyPattern(tableName, includedTableNamePatterns); + } + + private boolean inTableExcludedList(final String tableName) { + if (excludedTableNamePatterns == null) { + // If excluded tables list is empty means, all tables in included list must be accepted. + return false; + } + return tableMatchAnyPattern(tableName, excludedTableNamePatterns); + } +} diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/package-info.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/package-info.java new file mode 100644 index 0000000000..8e9ba1a2ba --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package grouping common replication classes. + */ +package org.apache.hadoop.hive.common.repl; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index aeaa0ec080..1249058325 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -21,7 +21,7 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.common.ReplConst; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java index 84302d62bf..b0347ce5c3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; public abstract class BasicFilter implements NotificationFilter { @Override @@ -29,5 +30,11 @@ public boolean accept(final NotificationEvent event) { return shouldAccept(event); } + boolean isTxnRelatedEvent(final NotificationEvent event) { + return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) || + (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) || + (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT))); + } + abstract boolean shouldAccept(final NotificationEvent event); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java index 712c12c895..fc4c9caa86 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.metastore.messaging.event.filters; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import java.util.regex.Pattern; @@ -40,12 +39,6 @@ public DatabaseAndTableFilter(final String databaseNameOrPattern, final String t this.tableName = tableName; } - private boolean isTxnRelatedEvent(final NotificationEvent event) { - return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) || - (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) || - (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT))); - } - @Override boolean shouldAccept(final NotificationEvent event) { if ((dbPattern == null) || isTxnRelatedEvent(event)) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java new file mode 100644 index 0000000000..e6a3e5cd7e --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.common.repl.ReplScope; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +/** + * Utility function that constructs a notification filter to check if table is accepted for replication. + */ +public class ReplEventFilter extends BasicFilter { + private final ReplScope replScope; + + public ReplEventFilter(final ReplScope replScope) { + this.replScope = replScope; + } + + @Override + boolean shouldAccept(final NotificationEvent event) { + // All txn related events are global ones and should be always accepted. + // For other events, if the DB/table names are included as per replication scope, then should + // accept the event. + return (isTxnRelatedEvent(event) + || replScope.includedInReplScope(event.getDbName(), event.getTableName())); + } +}