diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java new file mode 100644 index 0000000000..4e8931eab1 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.security.UserGroupInformation; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests Table level replication scenarios. + */ +public class TestTableLevelReplicationScenarios extends BaseReplicationScenariosAcidTables { + + @BeforeClass + public static void classLevelSetup() throws Exception { + Map overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestTableLevelReplicationScenarios.class); + } + + enum CreateTableType { + FULL_ACID, MM_ACID, NON_ACID, EXTERNAL + } + + class CreateTableInfo { + String tableName; + CreateTableType type; + boolean isPartitioned; + + CreateTableInfo(String tableName, CreateTableType type, boolean isPartitioned) { + this.tableName = tableName; + this.type = type; + this.isPartitioned = isPartitioned; + } + } + + private void createTables(List createTblsInfo) throws Throwable { + for (CreateTableInfo tblInfo : createTblsInfo) { + StringBuilder strBuilder = new StringBuilder("create "); + if (tblInfo.type == CreateTableType.EXTERNAL) { + strBuilder.append(" external "); + } + strBuilder.append(" table ").append(primaryDbName).append(".").append(tblInfo.tableName); + + if (tblInfo.isPartitioned) { + strBuilder.append(" (a int) partitioned by (b int) "); + } else { + strBuilder.append(" (a int, b int) "); + } + + if (tblInfo.type == CreateTableType.FULL_ACID) { + strBuilder.append(" clustered by (a) into 2 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")"); + } else if (tblInfo.type == CreateTableType.MM_ACID) { + strBuilder.append(" tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")"); + } + + String createTableCmd = strBuilder.toString(); + primary.run("use " + primaryDbName) + .run(createTableCmd) + .run("insert into " + tblInfo.tableName + " values(1, 10)"); + } + } + + private void createTables(String[] tableNames, CreateTableType type) throws Throwable { + List createTablesInfo = new ArrayList<>(); + for (String tblName : tableNames) { + createTablesInfo.add(new CreateTableInfo(tblName, type, false)); + } + createTables(createTablesInfo); + } + + private void replicateAndVerify(String replPolicy, String lastReplId, + List dumpWithClause, + List loadWithClause, + String[] expectedTables) throws Throwable { + if (dumpWithClause == null) { + dumpWithClause = new ArrayList<>(); + } + if (loadWithClause == null) { + loadWithClause = new ArrayList<>(); + } + + // For bootstrap replication, drop the target database before triggering it. + if (lastReplId == null) { + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .dump(replPolicy, lastReplId, dumpWithClause); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(expectedTables); + } + + @Test + public void testBasicBootstrapWithIncludeList() throws Throwable { + String[] originalNonAcidTables = new String[] {"t1", "t2" }; + String[] originalFullAcidTables = new String[] {"t3", "t4" }; + String[] originalMMAcidTables = new String[] {"t5" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 2 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1', 't4', 't5']"; + String[] replicatedTables = new String[] {"t1", "t4", "t5" }; + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } + + @Test + public void testBasicBootstrapWithIncludeAndExcludeList() throws Throwable { + String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t100" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // Replicate and verify if only 3 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1*', 't3'].['t100']"; + String[] replicatedTables = new String[] {"t1", "t11", "t3" }; + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } + + @Test + public void testBasicIncrementalWithIncludeList() throws Throwable { + WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + .dump(primaryDbName, null); + replica.load(replicatedDbName, tupleBootstrap.dumpLocation); + + String[] originalNonAcidTables = new String[] {"t1", "t2" }; + String[] originalFullAcidTables = new String[] {"t3", "t4" }; + String[] originalMMAcidTables = new String[] {"t5" }; + createTables(originalNonAcidTables, CreateTableType.NON_ACID); + createTables(originalFullAcidTables, CreateTableType.FULL_ACID); + createTables(originalMMAcidTables, CreateTableType.MM_ACID); + + // Replicate and verify if only 2 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1', 't5']"; + String[] replicatedTables = new String[] {"t1", "t5" }; + replicateAndVerify(replPolicy, tupleBootstrap.lastReplicationId, null, null, replicatedTables); + } + + @Test + public void testBasicIncrementalWithIncludeAndExcludeList() throws Throwable { + WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName) + .dump(primaryDbName, null); + replica.load(replicatedDbName, tupleBootstrap.dumpLocation); + + String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // Replicate and verify if only 3 tables are replicated to target. + String replPolicy = primaryDbName + ".['t1+', 't2'].['t11', 't3']"; + String[] replicatedTables = new String[] {"t1", "t111", "t2" }; + replicateAndVerify(replPolicy, tupleBootstrap.lastReplicationId, null, null, replicatedTables); + } + + @Test + public void testReplDumpWithIncorrectTablePolicy() throws Throwable { + String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // Repl dump should fail with SemanticException as repl policy syntax is incorrect. + String[] replicatedTables = new String[]{}; + boolean failed; + String[] invalidReplPolicies = new String[] { + primaryDbName + ".t1.t2", // Two explicit table names not allowed. + primaryDbName + ".['t1'].t2", // Table name and include list not allowed. + primaryDbName + ".t1.['t2']", // Table name and exclude list not allowed. + primaryDbName + ".[t1].t2" // Table name and include list not allowed. + }; + for (String replPolicy : invalidReplPolicies) { + failed = false; + try { + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } catch (Exception ex) { + LOG.info("Got exception: {}", ex.getMessage()); + Assert.assertTrue(ex instanceof SemanticException); + Assert.assertTrue(ex.getMessage().equals(ErrorMsg.REPL_INCORRECT_SYNTAX_FOR_REPL_POLICY.getMsg())); + failed = true; + } + Assert.assertTrue(failed); + } + + // Invalid repl policy where abrubtly placed DOT which causes ParseException during REPL dump. + invalidReplPolicies = new String[]{ + primaryDbName + ".['t1+'].", + primaryDbName + "..[]" + }; + for (String replPolicy : invalidReplPolicies) { + failed = false; + try { + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } catch (Exception ex) { + LOG.info("Got exception: {}", ex.getMessage()); + Assert.assertTrue(ex instanceof ParseException); + failed = true; + } + Assert.assertTrue(failed); + } + + // Invalid pattern where we didn't enclose table pattern within single or double quotes. + String replPolicy = primaryDbName + ".[t1].[t2]"; + failed = false; + try { + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } catch (Exception ex) { + LOG.info("Got exception: {}", ex.getMessage()); + Assert.assertTrue(ex instanceof SemanticException); + Assert.assertTrue(ex.getMessage().equals(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getMsg())); + failed = true; + } + Assert.assertTrue(failed); + } + + @Test + public void testFullDbBootstrapReplicationWithDifferentReplPolicyFormats() throws Throwable { + String[] originalTables = new String[] {"t1", "t200", "t3333" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // List of repl policy formats that leads to Full DB replication. + String[] fullDbReplPolicies = new String[] { + primaryDbName + ".[].[]", + primaryDbName + ".['.*?']", + primaryDbName + ".['.*?'].[]" + }; + + // Replicate and verify if all 3 tables are replicated to target. + for (String replPolicy : fullDbReplPolicies) { + replicateAndVerify(replPolicy, null, null, null, originalTables); + } + } + + @Test + public void testCaseInSensitiveNatureOfReplPolicy() throws Throwable { + String[] originalTables = new String[] {"a1", "aA11", "B2", "Cc3" }; + createTables(originalTables, CreateTableType.NON_ACID); + + // Replicate and verify if 2 tables are replicated as per policy. + String replPolicy = primaryDbName.toUpperCase() + ".['.*a1+', 'cc3', 'B2'].['AA1+', 'b2']"; + String[] replicatedTables = new String[] {"a1", "cc3" }; + replicateAndVerify(replPolicy, null, null, null, replicatedTables); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 554df3c6bf..f63658883f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -514,6 +514,16 @@ REPL_LOAD_PATH_NOT_FOUND(20019, "Load path does not exist."), REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION(20020, "Source of replication (repl.source.for) is not set in the database properties."), + REPL_INCORRECT_SYNTAX_FOR_REPL_POLICY(20021, + "Incorrect syntax for the replication policy. These are the supported formats. " + + "1) , " + + "2) ., " + + "3) .[], " + + "4) .[].[], " + + "5) .[].[exclude_tables_regex_list]"), + REPL_INVALID_DB_OR_TABLE_PATTERN(20022, + "Invalid pattern for the DB or table name in the replication policy. " + + "It should be a valid regex enclosed within single or double quotes."), // An exception from runtime that will show the full stack to client UNRESOLVED_RT_EXCEPTION(29999, "Runtime Error: {0}", "58004", true), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 2f72e23526..8c963be85d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -33,8 +33,8 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; -import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; +import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -184,9 +184,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0); - LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {} and table {}", - work.dbNameOrPattern, - work.tableNameOrPattern); + LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}", + work.dbNameOrPattern); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -201,7 +200,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId); IMetaStoreClient.NotificationFilter evFilter = new AndFilter( - new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern), + new ReplEventFilter(work.replScope), new EventBoundaryFilter(work.eventFrom, work.eventTo)); EventUtils.MSClientNotificationFetcher evFetcher @@ -255,7 +254,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); try (Writer writer = new Writer(dumpRoot, conf)) { - for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { + for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { try { Table table = hiveDb.getTable(dbName, tableName); @@ -297,8 +296,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) db, conf, getNewEventOnlyReplicationSpec(ev.getEventId()), - work.dbNameOrPattern, - work.tableNameOrPattern + work.replScope ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); @@ -321,7 +319,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0L); - LOG.info("Bootstrap Dump for db {} and table {}", work.dbNameOrPattern, work.tableNameOrPattern); + LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); long waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -337,7 +335,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th " with first incremental dump pending : " + dbName); } replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), - Utils.getAllTables(hiveDb, dbName).size(), + Utils.getAllTables(hiveDb, dbName, work.replScope).size(), hiveDb.getAllFunctions().size()); replLogger.startLog(); Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb); @@ -349,7 +347,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); try (Writer writer = new Writer(dbRoot, conf)) { - for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { + for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index d32be969e1..247066c451 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import com.google.common.primitives.Ints; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.LoggerFactory; @@ -28,7 +29,8 @@ Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ReplDumpWork implements Serializable { - final String dbNameOrPattern, tableNameOrPattern, astRepresentationForErrorMsg, resultTempPath; + final ReplScope replScope; + final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; final Long eventFrom; Long eventTo; private Integer maxEventLimit; @@ -38,11 +40,11 @@ public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; } - public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern, + public ReplDumpWork(ReplScope replScope, Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit, String resultTempPath) { - this.dbNameOrPattern = dbNameOrPattern; - this.tableNameOrPattern = tableNameOrPattern; + this.replScope = replScope; + this.dbNameOrPattern = replScope.getDbName(); this.eventFrom = eventFrom; this.eventTo = eventTo; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index f9f13e1a4c..2fcbe64aa7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -19,7 +19,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.ReplConst; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index bf9aa39307..f368386d6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -393,6 +393,8 @@ TOK_REPL_LOAD; TOK_REPL_STATUS; TOK_REPL_CONFIG; TOK_REPL_CONFIG_LIST; +TOK_REPL_TABLES; +TOK_REPL_TABLES_LIST; TOK_TO; TOK_ONLY; TOK_SUMMARY; @@ -891,13 +893,13 @@ replDumpStatement @init { pushMsg("replication dump statement", state); } @after { popMsg(state); } : KW_REPL KW_DUMP - (dbName=identifier) (DOT tblName=identifier)? + (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? (KW_FROM (eventId=Number) (KW_TO (rangeEnd=Number))? (KW_LIMIT (batchSize=Number))? )? (KW_WITH replConf=replConfigs)? - -> ^(TOK_REPL_DUMP $dbName ^(TOK_TABNAME $tblName)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) + -> ^(TOK_REPL_DUMP $dbName $tablePolicy? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) ; replLoadStatement @@ -924,6 +926,32 @@ replConfigsList keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_REPL_CONFIG_LIST keyValueProperty+) ; +replTableLevelPolicy +@init { pushMsg("replication table level policy definition", state); } +@after { popMsg(state); } + : + ((replTablesIncludeList=replTablesList) (DOT replTablesExcludeList=replTablesList)?) + -> ^(TOK_REPL_TABLES $replTablesIncludeList $replTablesExcludeList?) + ; + +replTablesList +@init { pushMsg("replication table name or comma separated table names pattern list", state); } +@after { popMsg(state); } + : + (replTable=identifier) -> ^(TOK_TABNAME $replTable) + | + (LSQUARE (tablePattern (COMMA tablePattern)*)? RSQUARE) -> ^(TOK_REPL_TABLES_LIST tablePattern*) + ; + +tablePattern +@init { pushMsg("Table name pattern", state); } +@after { popMsg(state); } + : + (pattern=StringLiteral) -> $pattern + | + (identifier) -> TOK_NULL + ; + replStatusStatement @init { pushMsg("replication status statement", state); } @after { popMsg(state); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index b4b849c4ec..b9c2f7bb1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.Warehouse; @@ -57,21 +58,24 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_NULL; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_TABLES; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_TABLES_LIST; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { - // Database name or pattern - private String dbNameOrPattern; - // Table name or pattern - private String tblNameOrPattern; + // Replication Scope + private ReplScope replScope = new ReplScope(); + private Long eventFrom; private Long eventTo; private Integer maxEventLimit; + // Base path for REPL LOAD private String path; // Added conf member to set the REPL command specific config entries without affecting the configs @@ -101,23 +105,16 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { switch (ast.getToken().getType()) { case TOK_REPL_DUMP: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump"); - try { - initReplDump(ast); - } catch (HiveException e) { - throw new SemanticException(e.getMessage(), e); - } analyzeReplDump(ast); break; } case TOK_REPL_LOAD: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load"); - initReplLoad(ast); analyzeReplLoad(ast); break; } case TOK_REPL_STATUS: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status"); - initReplStatus(ast); analyzeReplStatus(ast); break; } @@ -134,50 +131,110 @@ private void setTxnConfigs() { } } + private boolean isValidTablesList(Tree tablesListTree) { + if (tablesListTree.getChildCount() <= 1) { + // If one or zero child, then it is valid. + // For single table replication, the valid format is .. + // To include multiple tables (t1 and t2), then valid format is .[t1, t2]. + // So, single child is always valid either it is table_name or tables_list. + return true; + } + assert(tablesListTree.getChildCount() == 2); + + // We don't allow input of format .. or ..[]. + // To include t1* and exclude t100, then valid format is .[t1*].[t100]. + // So, if 2 children are there, then both should be table_lists. + return ((tablesListTree.getChild(0).getType() == TOK_REPL_TABLES_LIST) + && (tablesListTree.getChild(1).getType() == TOK_REPL_TABLES_LIST)); + } + private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); boolean isMetaDataOnly = false; - dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); - - // skip the first node, which is always required - int currNode = 1; - while (currNode < numChildren) { - if (ast.getChild(currNode).getType() == TOK_REPL_CONFIG) { - Map replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) ast.getChild(currNode).getChild(0)); - if (null != replConfigs) { - for (Map.Entry config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); + + String dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + LOG.info("ReplScope: Set DB Name: {}", dbNameOrPattern); + replScope.setDbName(dbNameOrPattern); + + // Skip the first node, which is always required + int childIdx = 1; + while (childIdx < numChildren) { + Tree currNode = ast.getChild(childIdx); + switch (currNode.getType()) { + case TOK_REPL_CONFIG: { + Map replConfigs + = DDLSemanticAnalyzer.getProps((ASTNode) currNode.getChild(0)); + if (null != replConfigs) { + for (Map.Entry config : replConfigs.entrySet()) { + conf.set(config.getKey(), config.getValue()); + } + isMetaDataOnly = HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY); } - isMetaDataOnly = HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY); + break; } - } else if (ast.getChild(currNode).getType() == TOK_TABNAME) { - // optional tblName was specified. - tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getChild(0).getText()); - } else { - // TOK_FROM subtree - Tree fromNode = ast.getChild(currNode); - eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); - // skip the first, which is always required - int numChild = 1; - while (numChild < fromNode.getChildCount()) { - if (fromNode.getChild(numChild).getType() == TOK_TO) { - eventTo = - Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); - // skip the next child, since we already took care of it - numChild++; - } else if (fromNode.getChild(numChild).getType() == TOK_LIMIT) { - maxEventLimit = - Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); - // skip the next child, since we already took care of it + case TOK_REPL_TABLES: { + assert(currNode.getChildCount() <= 2); + if (!isValidTablesList(currNode)) { + LOG.error(ErrorMsg.REPL_INCORRECT_SYNTAX_FOR_REPL_POLICY.getMsg()); + throw new SemanticException(ErrorMsg.REPL_INCORRECT_SYNTAX_FOR_REPL_POLICY.getMsg()); + } + + // Traverse the children which can be single table_name node or just include tables list + // or both include and exclude tables list. + for (int listIdx = 0; listIdx < currNode.getChildCount(); listIdx++) { + Tree tablesNode = currNode.getChild(listIdx); + if (tablesNode.getType() == TOK_TABNAME) { + String tableName = tablesNode.getChild(0).getText(); + LOG.info("ReplScope: Set Table Name: {}", tableName); + replScope.setTableName(tableName); + } else { + List tablesList = new ArrayList<>(); + for (int child = 0; child < tablesNode.getChildCount(); child++) { + Tree tablePatternNode = tablesNode.getChild(child); + if (tablePatternNode.getType() == TOK_NULL) { + LOG.error(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getMsg()); + throw new SemanticException(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN.getMsg()); + } + tablesList.add(unescapeSQLString(tablePatternNode.getText())); + } + if (!tablesList.isEmpty()) { + if (listIdx == 0) { + LOG.info("ReplScope: Set Included Tables List: {}", tablesList); + replScope.setIncludedTablePatterns(tablesList); + } else { + LOG.info("ReplScope: Set Excluded Tables List: {}", tablesList); + replScope.setExcludedTablePatterns(tablesList); + } + } + } + } + break; + } + default: { + // TOK_FROM subtree + Tree fromNode = currNode; + eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); + // skip the first, which is always required + int numChild = 1; + while (numChild < fromNode.getChildCount()) { + if (fromNode.getChild(numChild).getType() == TOK_TO) { + eventTo = + Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); + // skip the next child, since we already took care of it + numChild++; + } else if (fromNode.getChild(numChild).getType() == TOK_LIMIT) { + maxEventLimit = + Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); + // skip the next child, since we already took care of it + numChild++; + } + // move to the next child in FROM tree numChild++; } - // move to the next child in FROM tree - numChild++; } } // move to the next root node - currNode++; + childIdx++; } for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { @@ -196,15 +253,17 @@ private void initReplDump(ASTNode ast) throws HiveException { // REPL DUMP private void analyzeReplDump(ASTNode ast) throws SemanticException { - LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern) - + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to " - + String.valueOf(eventTo) + " maxEventLimit " + String.valueOf(maxEventLimit)); + try { + initReplDump(ast); + } catch (HiveException e) { + throw new SemanticException(e.getMessage(), e); + } + try { ctx.setResFile(ctx.getLocalTmpPath()); Task replDumpWorkTask = TaskFactory .get(new ReplDumpWork( - dbNameOrPattern, - tblNameOrPattern, + replScope, eventFrom, eventTo, ErrorMsg.INVALID_PATH.getMsg(ast), @@ -212,15 +271,13 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { ctx.getResFile().toUri().toString() ), conf); rootTasks.add(replDumpWorkTask); - if (dbNameOrPattern != null) { - for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { - if (tblNameOrPattern != null) { - for (String tblName : Utils.matchesTbl(db, dbName, tblNameOrPattern)) { - inputs.add(new ReadEntity(db.getTable(dbName, tblName))); - } - } else { - inputs.add(new ReadEntity(db.getDatabase(dbName))); + for (String dbName : Utils.matchesDb(db, replScope.getDbName())) { + if (!replScope.includeAllTables()) { + for (String tblName : Utils.matchesTbl(db, dbName, replScope)) { + inputs.add(new ReadEntity(db.getTable(dbName, tblName))); } + } else { + inputs.add(new ReadEntity(db.getDatabase(dbName))); } } setFetchTask(createFetchTask(dumpSchema)); @@ -262,10 +319,10 @@ private void initReplLoad(ASTNode ast) throws SemanticException { ASTNode childNode = (ASTNode) ast.getChild(i); switch (childNode.getToken().getType()) { case TOK_DBNAME: - dbNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + replScope.setDbName(PlanUtils.stripQuotes(childNode.getChild(0).getText())); break; case TOK_TABNAME: - tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + replScope.setTableName(PlanUtils.stripQuotes(childNode.getChild(0).getText())); break; case TOK_REPL_CONFIG: setConfigs((ASTNode) childNode.getChild(0)); @@ -319,13 +376,11 @@ private void initReplLoad(ASTNode ast) throws SemanticException { * 36/ */ private void analyzeReplLoad(ASTNode ast) throws SemanticException { - LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "." - + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path)); + initReplLoad(ast); - // for analyze repl load, we walk through the dir structure available in the path, + // For analyze repl load, we walk through the dir structure available in the path, // looking at each db, and then each table, and then setting up the appropriate // import job in its place. - try { assert(path != null); Path loadPath = new Path(path); @@ -377,8 +432,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } else { LOG.debug("{} contains an bootstrap dump", loadPath); } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo(), + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(), + replScope.getTableName(), queryState.getLineageState(), evDump, dmd.getEventTo(), dirLocationsToCopy(loadPath, evDump)); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } catch (Exception e) { @@ -431,13 +486,13 @@ private void setConfigs(ASTNode node) throws SemanticException { // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ - dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + replScope.setDbName(PlanUtils.stripQuotes(ast.getChild(0).getText())); int numChildren = ast.getChildCount(); for (int i = 1; i < numChildren; i++) { ASTNode childNode = (ASTNode) ast.getChild(i); switch (childNode.getToken().getType()) { case TOK_TABNAME: - tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + replScope.setTableName(PlanUtils.stripQuotes(childNode.getChild(0).getText())); break; case TOK_REPL_CONFIG: setConfigs((ASTNode) childNode.getChild(0)); @@ -449,9 +504,10 @@ private void initReplStatus(ASTNode ast) throws SemanticException{ } private void analyzeReplStatus(ASTNode ast) throws SemanticException { - LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern) - + "." + String.valueOf(tblNameOrPattern)); + initReplStatus(ast); + String dbNameOrPattern = replScope.getDbName(); + String tblNameOrPattern = replScope.getTableName(); String replLastId = null; try { @@ -484,7 +540,7 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string"); setFetchTask(createFetchTask("last_repl_id#string")); LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}", - String.valueOf(replLastId), ctx.getResFile(), conf); + replLastId, ctx.getResFile(), conf); } private void prepareReturnValues(List values, String schema) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index b09ec25e4b..ce5ef06830 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -88,18 +89,24 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi public static Iterable matchesTbl(Hive db, String dbName, String tblPattern) throws HiveException { if (tblPattern == null) { - return getAllTables(db, dbName); + return getAllTables(db, dbName, null); } else { return db.getTablesByPattern(dbName, tblPattern); } } - public static Collection getAllTables(Hive db, String dbName) throws HiveException { + public static Iterable matchesTbl(Hive db, String dbName, ReplScope replScope) + throws HiveException { + return getAllTables(db, dbName, replScope); + } + + public static Collection getAllTables(Hive db, String dbName, ReplScope replScope) throws HiveException { return Collections2.filter(db.getAllTables(dbName), tableName -> { - assert tableName != null; + assert(tableName != null); return !tableName.toLowerCase().startsWith( - SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()); + SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()) + && ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); }); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 5c2b3d1883..995d634ed9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -18,10 +18,12 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; @@ -101,6 +103,20 @@ private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hiv createDumpFile(context, qlMdTable, qlPtns, fileListArray); } + private List getAllWriteEventInfo(Context withinContext) throws Exception { + String contextDbName = StringUtils.normalizeIdentifier(withinContext.replScope.getDbName()); + RawStore rawStore = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf); + List writeEventInfoList + = rawStore.getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, null); + return ((writeEventInfoList == null) + ? null + : new ArrayList<>(Collections2.filter(writeEventInfoList, + writeEventInfo -> { + assert(writeEventInfo != null); + return withinContext.replScope.tableIncludedInReplScope(writeEventInfo.getTable()); + }))); + } + @Override public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON); @@ -125,15 +141,12 @@ public void handle(Context withinContext) throws Exception { "not dumping acid tables."); replicatingAcidEvents = false; } - String contextDbName = withinContext.dbName == null ? null : - StringUtils.normalizeIdentifier(withinContext.dbName); - String contextTableName = withinContext.tableName == null ? null : - StringUtils.normalizeIdentifier(withinContext.tableName); + List writeEventInfoList = null; if (replicatingAcidEvents) { - writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf). - getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName); + writeEventInfoList = getAllWriteEventInfo(withinContext); } + int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0); if (numEntry != 0) { eventMessage.addWriteEventInfo(writeEventInfoList); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index ec35f4e94d..0d60c3136d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; - import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -40,18 +40,16 @@ final Hive db; final HiveConf hiveConf; final ReplicationSpec replicationSpec; - final String dbName; - final String tableName; + final ReplScope replScope; public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, - ReplicationSpec replicationSpec, String dbName, String tableName) { + ReplicationSpec replicationSpec, ReplScope replScope) { this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; this.hiveConf = hiveConf; this.replicationSpec = replicationSpec; - this.dbName = dbName; - this.tableName = tableName; + this.replScope = replScope; } public Context(Context other) { @@ -60,11 +58,10 @@ public Context(Context other) { this.db = other.db; this.hiveConf = other.hiveConf; this.replicationSpec = other.replicationSpec; - this.dbName = other.dbName; - this.tableName = other.tableName; + this.replScope = other.replScope; } - public void setEventRoot(Path eventRoot) { + void setEventRoot(Path eventRoot) { this.eventRoot = eventRoot; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 7a58dba877..a412d435fd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -39,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; @@ -103,13 +103,14 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiv public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throws Exception { List tableList = Arrays.asList("a1", "a2"); String dbRandomKey = "akeytoberandom"; + ReplScope replScope = new ReplScope("default"); mockStatic(Utils.class); when(Utils.matchesDb(same(hive), eq("default"))) .thenReturn(Collections.singletonList("default")); - when(Utils.getAllTables(same(hive), eq("default"))).thenReturn(tableList); + when(Utils.getAllTables(same(hive), eq("default"), eq(replScope))).thenReturn(tableList); when(Utils.setDbBootstrapDumpState(same(hive), eq("default"))).thenReturn(dbRandomKey); - when(Utils.matchesTbl(same(hive), eq("default"), anyString())).thenReturn(tableList); + when(Utils.matchesTbl(same(hive), eq("default"), eq(replScope))).thenReturn(tableList); when(hive.getAllFunctions()).thenReturn(Collections.emptyList()); @@ -138,7 +139,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, task.initialize(queryState, null, null, null); task.setWork( - new ReplDumpWork("default", "", + new ReplDumpWork(replScope, Long.MAX_VALUE, Long.MAX_VALUE, "", Integer.MAX_VALUE, "") ); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java index a034723faf..b8bba6fcab 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java @@ -75,9 +75,11 @@ private static ASTNode assertFromEvent(final int expectedNumberOfChildren, ASTNo private static void assertTableName(ASTNode root) { ASTNode child = (ASTNode) root.getChild(1); - assertEquals("TOK_TABNAME", child.getText()); + assertEquals("TOK_REPL_TABLES", child.getText()); assertEquals(1, child.getChildCount()); - assertEquals("test_table", child.getChild(0).getText()); + assertEquals("TOK_TABNAME", child.getChild(0).getText()); + assertEquals(1, child.getChild(0).getChildCount()); + assertEquals("test_table", child.getChild(0).getChild(0).getText()); } private static void assertDatabase(final int expectedNumberOfChildren, ASTNode root) { diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java similarity index 96% rename from standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java index e25189d35d..c2c8e4b20f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.common; +package org.apache.hadoop.hive.common.repl; /** * A class that defines the constant strings used by the replication implementation. diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java new file mode 100644 index 0000000000..8019bd28fd --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.repl; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Class that stores the replication scope. Replication scope includes the details of database and + * tables included under the scope of replication. + */ +public class ReplScope implements Serializable { + private String dbName; + private String tableName; + + private Pattern dbNamePattern; + private List includedTableNamePatterns; // Only for REPL DUMP and exist only if tableName == null. + private List excludedTableNamePatterns; // Only for REPL DUMP and exist only if tableName == null. + + public ReplScope() { + } + + public ReplScope(String dbName) { + setDbName(dbName); + } + + public void setDbName(String dbName) { + this.dbName = dbName; + this.dbNamePattern = (((dbName == null) || "*".equals(dbName)) + ? null : Pattern.compile(dbName, Pattern.CASE_INSENSITIVE)); + } + + public String getDbName() { + return dbName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getTableName() { + return tableName; + } + + public void setIncludedTablePatterns(List includedTableNamePatterns) { + this.includedTableNamePatterns = compilePatterns(includedTableNamePatterns); + } + + public void setExcludedTablePatterns(List excludedTableNamePatterns) { + this.excludedTableNamePatterns = compilePatterns(excludedTableNamePatterns); + } + + public boolean includeAllTables() { + return ((tableName == null) + && (includedTableNamePatterns == null) + && (excludedTableNamePatterns == null)); + } + + public boolean includedInReplScope(final String dbName, final String tableName) { + return dbIncludedInReplScope(dbName) && tableIncludedInReplScope(tableName); + } + + public boolean dbIncludedInReplScope(final String dbName) { + return (dbNamePattern == null) || dbNamePattern.matcher(dbName).matches(); + } + + public boolean tableIncludedInReplScope(final String tableName) { + if (this.tableName != null) { + // For single table replication, table name should match to be included. + return this.tableName.equalsIgnoreCase(tableName); + } + if (tableName == null) { + // If input tableName is empty, it means, DB level event. It should be always included as + // this is DB level replication with list of included/excluded tables. + return true; + } + return (inTableIncludedList(tableName) && !inTableExcludedList(tableName)); + } + + private List compilePatterns(List patterns) { + if (patterns == null || patterns.isEmpty()) { + return null; + } + List compiledPatterns = new ArrayList<>(); + for (String pattern : patterns) { + // Convert the pattern to lower case because events/HMS will have table names in lower case. + compiledPatterns.add(Pattern.compile(pattern, Pattern.CASE_INSENSITIVE)); + } + return compiledPatterns; + } + + private boolean tableMatchAnyPattern(final String tableName, List tableNamePatterns) { + assert(tableNamePatterns != null); + for (Pattern tableNamePattern : tableNamePatterns) { + if (tableNamePattern.matcher(tableName).matches()) { + return true; + } + } + return false; + } + + private boolean inTableIncludedList(final String tableName) { + if (includedTableNamePatterns == null) { + // If included list is empty, repl policy should be db.[].[a,b]. It is equivalent to db.[*].[a,b] + // So, all tables must be accepted. + return true; + } + return tableMatchAnyPattern(tableName, includedTableNamePatterns); + } + + private boolean inTableExcludedList(final String tableName) { + if (excludedTableNamePatterns == null) { + // If excluded tables list is empty means, all tables in included list must be accepted. + return false; + } + return tableMatchAnyPattern(tableName, excludedTableNamePatterns); + } +} diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/package-info.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/package-info.java new file mode 100644 index 0000000000..8e9ba1a2ba --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package grouping common replication classes. + */ +package org.apache.hadoop.hive.common.repl; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index aeaa0ec080..1249058325 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -21,7 +21,7 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.common.ReplConst; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java index 84302d62bf..b0347ce5c3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; public abstract class BasicFilter implements NotificationFilter { @Override @@ -29,5 +30,11 @@ public boolean accept(final NotificationEvent event) { return shouldAccept(event); } + boolean isTxnRelatedEvent(final NotificationEvent event) { + return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) || + (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) || + (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT))); + } + abstract boolean shouldAccept(final NotificationEvent event); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java index 712c12c895..fc4c9caa86 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.metastore.messaging.event.filters; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import java.util.regex.Pattern; @@ -40,12 +39,6 @@ public DatabaseAndTableFilter(final String databaseNameOrPattern, final String t this.tableName = tableName; } - private boolean isTxnRelatedEvent(final NotificationEvent event) { - return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) || - (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) || - (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT))); - } - @Override boolean shouldAccept(final NotificationEvent event) { if ((dbPattern == null) || isTxnRelatedEvent(event)) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java new file mode 100644 index 0000000000..e6a3e5cd7e --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.common.repl.ReplScope; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +/** + * Utility function that constructs a notification filter to check if table is accepted for replication. + */ +public class ReplEventFilter extends BasicFilter { + private final ReplScope replScope; + + public ReplEventFilter(final ReplScope replScope) { + this.replScope = replScope; + } + + @Override + boolean shouldAccept(final NotificationEvent event) { + // All txn related events are global ones and should be always accepted. + // For other events, if the DB/table names are included as per replication scope, then should + // accept the event. + return (isTxnRelatedEvent(event) + || replScope.includedInReplScope(event.getDbName(), event.getTableName())); + } +}