diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java new file mode 100644 index 0000000000..2ba895a378 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.security.UserGroupInformation; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import javax.annotation.Nullable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestTableLevelReplicationScenarios extends BaseReplicationScenariosAcidTables { + + @BeforeClass + public static void classLevelSetup() throws Exception { + HashMap overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestTableLevelReplicationScenarios.class); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 2f72e23526..8c963be85d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -33,8 +33,8 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; -import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; +import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -184,9 +184,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) { bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0); - LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {} and table {}", - work.dbNameOrPattern, - work.tableNameOrPattern); + LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}", + work.dbNameOrPattern); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -201,7 +200,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId); IMetaStoreClient.NotificationFilter evFilter = new AndFilter( - new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern), + new ReplEventFilter(work.replScope), new EventBoundaryFilter(work.eventFrom, work.eventTo)); EventUtils.MSClientNotificationFetcher evFetcher @@ -255,7 +254,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true); try (Writer writer = new Writer(dumpRoot, conf)) { - for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { + for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { try { Table table = hiveDb.getTable(dbName, tableName); @@ -297,8 +296,7 @@ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) db, conf, getNewEventOnlyReplicationSpec(ev.getEventId()), - work.dbNameOrPattern, - work.tableNameOrPattern + work.replScope ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); @@ -321,7 +319,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th Long bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L); assert (bootDumpBeginReplId >= 0L); - LOG.info("Bootstrap Dump for db {} and table {}", work.dbNameOrPattern, work.tableNameOrPattern); + LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern); long timeoutInMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS); long waitUntilTime = System.currentTimeMillis() + timeoutInMs; @@ -337,7 +335,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th " with first incremental dump pending : " + dbName); } replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), - Utils.getAllTables(hiveDb, dbName).size(), + Utils.getAllTables(hiveDb, dbName, work.replScope).size(), hiveDb.getAllFunctions().size()); replLogger.startLog(); Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb); @@ -349,7 +347,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) th conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); try (Writer writer = new Writer(dbRoot, conf)) { - for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) { + for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug( "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index d32be969e1..247066c451 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import com.google.common.primitives.Ints; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.plan.Explain; import org.slf4j.LoggerFactory; @@ -28,7 +29,8 @@ Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ReplDumpWork implements Serializable { - final String dbNameOrPattern, tableNameOrPattern, astRepresentationForErrorMsg, resultTempPath; + final ReplScope replScope; + final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath; final Long eventFrom; Long eventTo; private Integer maxEventLimit; @@ -38,11 +40,11 @@ public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; } - public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern, + public ReplDumpWork(ReplScope replScope, Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit, String resultTempPath) { - this.dbNameOrPattern = dbNameOrPattern; - this.tableNameOrPattern = tableNameOrPattern; + this.replScope = replScope; + this.dbNameOrPattern = replScope.getDbName(); this.eventFrom = eventFrom; this.eventTo = eventTo; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index f9f13e1a4c..2fcbe64aa7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -19,7 +19,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.ReplConst; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index bf9aa39307..19ebbbbc39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -393,6 +393,8 @@ TOK_REPL_LOAD; TOK_REPL_STATUS; TOK_REPL_CONFIG; TOK_REPL_CONFIG_LIST; +TOK_REPL_TABLES; +TOK_REPL_TABLES_LIST; TOK_TO; TOK_ONLY; TOK_SUMMARY; @@ -891,13 +893,13 @@ replDumpStatement @init { pushMsg("replication dump statement", state); } @after { popMsg(state); } : KW_REPL KW_DUMP - (dbName=identifier) (DOT tblName=identifier)? + (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? (KW_FROM (eventId=Number) (KW_TO (rangeEnd=Number))? (KW_LIMIT (batchSize=Number))? )? (KW_WITH replConf=replConfigs)? - -> ^(TOK_REPL_DUMP $dbName ^(TOK_TABNAME $tblName)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) + -> ^(TOK_REPL_DUMP $dbName $tablePolicy? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) ; replLoadStatement @@ -924,6 +926,30 @@ replConfigsList keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_REPL_CONFIG_LIST keyValueProperty+) ; +replTableLevelPolicy +@init { pushMsg("replication table level policy definition", state); } +@after { popMsg(state); } + : + ((replTablesIncludeList=replTablesList) (DOT replTablesExcludeList=replTablesList)?)? + -> ^(TOK_REPL_TABLES $replTablesIncludeList? $replTablesExcludeList?) + ; + +replTablesList +@init { pushMsg("replication tables list", state); } +@after { popMsg(state); } + : + (replTable=identifier) -> ^(TOK_REPL_TABLES_LIST $replTable) + | + (LSQUARE replTables=tablesList RSQUARE) -> ^(TOK_REPL_TABLES_LIST $replTables) + ; + +tablesList +@init { pushMsg("Comma seperated table names list", state); } +@after { popMsg(state); } + : + tableName (COMMA tableName)* -> tableName+ + ; + replStatusStatement @init { pushMsg("replication status statement", state); } @after { popMsg(state); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index b4b849c4ec..6af1fa3590 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.Warehouse; @@ -61,17 +62,18 @@ import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_TABLES; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { - // Database name or pattern - private String dbNameOrPattern; - // Table name or pattern - private String tblNameOrPattern; + // Replication Scope + private ReplScope replScope = new ReplScope(); + private Long eventFrom; private Long eventTo; private Integer maxEventLimit; + // Base path for REPL LOAD private String path; // Added conf member to set the REPL command specific config entries without affecting the configs @@ -101,23 +103,16 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { switch (ast.getToken().getType()) { case TOK_REPL_DUMP: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump"); - try { - initReplDump(ast); - } catch (HiveException e) { - throw new SemanticException(e.getMessage(), e); - } analyzeReplDump(ast); break; } case TOK_REPL_LOAD: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load"); - initReplLoad(ast); analyzeReplLoad(ast); break; } case TOK_REPL_STATUS: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status"); - initReplStatus(ast); analyzeReplStatus(ast); break; } @@ -137,47 +132,71 @@ private void setTxnConfigs() { private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); boolean isMetaDataOnly = false; - dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); - - // skip the first node, which is always required - int currNode = 1; - while (currNode < numChildren) { - if (ast.getChild(currNode).getType() == TOK_REPL_CONFIG) { - Map replConfigs - = DDLSemanticAnalyzer.getProps((ASTNode) ast.getChild(currNode).getChild(0)); - if (null != replConfigs) { - for (Map.Entry config : replConfigs.entrySet()) { - conf.set(config.getKey(), config.getValue()); + String dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + replScope.setDbName(dbNameOrPattern); + + // Skip the first node, which is always required + int childIdx = 1; + while (childIdx < numChildren) { + Tree currNode = ast.getChild(childIdx); + switch (currNode.getType()) { + case TOK_REPL_CONFIG: { + Map replConfigs + = DDLSemanticAnalyzer.getProps((ASTNode) currNode.getChild(0)); + if (null != replConfigs) { + for (Map.Entry config : replConfigs.entrySet()) { + conf.set(config.getKey(), config.getValue()); + } + isMetaDataOnly = HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY); } - isMetaDataOnly = HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY); + break; } - } else if (ast.getChild(currNode).getType() == TOK_TABNAME) { - // optional tblName was specified. - tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getChild(0).getText()); - } else { - // TOK_FROM subtree - Tree fromNode = ast.getChild(currNode); - eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); - // skip the first, which is always required - int numChild = 1; - while (numChild < fromNode.getChildCount()) { - if (fromNode.getChild(numChild).getType() == TOK_TO) { - eventTo = - Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); - // skip the next child, since we already took care of it - numChild++; - } else if (fromNode.getChild(numChild).getType() == TOK_LIMIT) { - maxEventLimit = - Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); - // skip the next child, since we already took care of it + case TOK_REPL_TABLES: { + List tablesIncluded = new ArrayList<>(); + List tablesExcluded = new ArrayList<>(); + Tree tablesListNode = currNode.getChild(0); + for (int child = 0; child < tablesListNode.getChildCount(); child++) { + tablesIncluded.add(tablesListNode.getChild(child).getText()); + } + if (!tablesIncluded.isEmpty()) { + replScope.setIncludedTablePatterns(tablesIncluded); + } + if (currNode.getChildCount() > 1) { + tablesListNode = currNode.getChild(1); + for (int child = 0; child < tablesListNode.getChildCount(); child++) { + tablesExcluded.add(tablesListNode.getChild(child).getText()); + } + if (!tablesExcluded.isEmpty()) { + replScope.setExcludedTablePatterns(tablesExcluded); + } + } + break; + } + default: { + // TOK_FROM subtree + Tree fromNode = currNode; + eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); + // skip the first, which is always required + int numChild = 1; + while (numChild < fromNode.getChildCount()) { + if (fromNode.getChild(numChild).getType() == TOK_TO) { + eventTo = + Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); + // skip the next child, since we already took care of it + numChild++; + } else if (fromNode.getChild(numChild).getType() == TOK_LIMIT) { + maxEventLimit = + Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); + // skip the next child, since we already took care of it + numChild++; + } + // move to the next child in FROM tree numChild++; } - // move to the next child in FROM tree - numChild++; } } // move to the next root node - currNode++; + childIdx++; } for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { @@ -196,15 +215,17 @@ private void initReplDump(ASTNode ast) throws HiveException { // REPL DUMP private void analyzeReplDump(ASTNode ast) throws SemanticException { - LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern) - + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to " - + String.valueOf(eventTo) + " maxEventLimit " + String.valueOf(maxEventLimit)); + try { + initReplDump(ast); + } catch (HiveException e) { + throw new SemanticException(e.getMessage(), e); + } + try { ctx.setResFile(ctx.getLocalTmpPath()); Task replDumpWorkTask = TaskFactory .get(new ReplDumpWork( - dbNameOrPattern, - tblNameOrPattern, + replScope, eventFrom, eventTo, ErrorMsg.INVALID_PATH.getMsg(ast), @@ -212,16 +233,8 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { ctx.getResFile().toUri().toString() ), conf); rootTasks.add(replDumpWorkTask); - if (dbNameOrPattern != null) { - for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { - if (tblNameOrPattern != null) { - for (String tblName : Utils.matchesTbl(db, dbName, tblNameOrPattern)) { - inputs.add(new ReadEntity(db.getTable(dbName, tblName))); - } - } else { - inputs.add(new ReadEntity(db.getDatabase(dbName))); - } - } + for (String dbName : Utils.matchesDb(db, replScope.getDbName())) { + inputs.add(new ReadEntity(db.getDatabase(dbName))); } setFetchTask(createFetchTask(dumpSchema)); } catch (Exception e) { @@ -262,10 +275,10 @@ private void initReplLoad(ASTNode ast) throws SemanticException { ASTNode childNode = (ASTNode) ast.getChild(i); switch (childNode.getToken().getType()) { case TOK_DBNAME: - dbNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + replScope.setDbName(PlanUtils.stripQuotes(childNode.getChild(0).getText())); break; case TOK_TABNAME: - tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + replScope.setTableName(PlanUtils.stripQuotes(childNode.getChild(0).getText())); break; case TOK_REPL_CONFIG: setConfigs((ASTNode) childNode.getChild(0)); @@ -319,13 +332,11 @@ private void initReplLoad(ASTNode ast) throws SemanticException { * 36/ */ private void analyzeReplLoad(ASTNode ast) throws SemanticException { - LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "." - + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path)); + initReplLoad(ast); - // for analyze repl load, we walk through the dir structure available in the path, + // For analyze repl load, we walk through the dir structure available in the path, // looking at each db, and then each table, and then setting up the appropriate // import job in its place. - try { assert(path != null); Path loadPath = new Path(path); @@ -377,8 +388,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } else { LOG.debug("{} contains an bootstrap dump", loadPath); } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo(), + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(), + replScope.getTableName(), queryState.getLineageState(), evDump, dmd.getEventTo(), dirLocationsToCopy(loadPath, evDump)); rootTasks.add(TaskFactory.get(replLoadWork, conf)); } catch (Exception e) { @@ -431,13 +442,13 @@ private void setConfigs(ASTNode node) throws SemanticException { // REPL STATUS private void initReplStatus(ASTNode ast) throws SemanticException{ - dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + replScope.setDbName(PlanUtils.stripQuotes(ast.getChild(0).getText())); int numChildren = ast.getChildCount(); for (int i = 1; i < numChildren; i++) { ASTNode childNode = (ASTNode) ast.getChild(i); switch (childNode.getToken().getType()) { case TOK_TABNAME: - tblNameOrPattern = PlanUtils.stripQuotes(childNode.getChild(0).getText()); + replScope.setTableName(PlanUtils.stripQuotes(childNode.getChild(0).getText())); break; case TOK_REPL_CONFIG: setConfigs((ASTNode) childNode.getChild(0)); @@ -449,9 +460,10 @@ private void initReplStatus(ASTNode ast) throws SemanticException{ } private void analyzeReplStatus(ASTNode ast) throws SemanticException { - LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern) - + "." + String.valueOf(tblNameOrPattern)); + initReplStatus(ast); + String dbNameOrPattern = replScope.getDbName(); + String tblNameOrPattern = replScope.getTableName(); String replLastId = null; try { @@ -484,7 +496,7 @@ private void analyzeReplStatus(ASTNode ast) throws SemanticException { prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string"); setFetchTask(createFetchTask("last_repl_id#string")); LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}", - String.valueOf(replLastId), ctx.getResFile(), conf); + replLastId, ctx.getResFile(), conf); } private void prepareReturnValues(List values, String schema) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index b09ec25e4b..ce5ef06830 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -88,18 +89,24 @@ public static void writeOutput(List values, Path outputFile, HiveConf hi public static Iterable matchesTbl(Hive db, String dbName, String tblPattern) throws HiveException { if (tblPattern == null) { - return getAllTables(db, dbName); + return getAllTables(db, dbName, null); } else { return db.getTablesByPattern(dbName, tblPattern); } } - public static Collection getAllTables(Hive db, String dbName) throws HiveException { + public static Iterable matchesTbl(Hive db, String dbName, ReplScope replScope) + throws HiveException { + return getAllTables(db, dbName, replScope); + } + + public static Collection getAllTables(Hive db, String dbName, ReplScope replScope) throws HiveException { return Collections2.filter(db.getAllTables(dbName), tableName -> { - assert tableName != null; + assert(tableName != null); return !tableName.toLowerCase().startsWith( - SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()); + SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase()) + && ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); }); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 5c2b3d1883..49236d7ec4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -18,10 +18,12 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; @@ -101,6 +103,20 @@ private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hiv createDumpFile(context, qlMdTable, qlPtns, fileListArray); } + private List getAllWriteEventInfo(Context withinContext) throws Exception { + String contextDbName = StringUtils.normalizeIdentifier(withinContext.replScope.getDbName()); + RawStore rawStore = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf); + List writeEventInfoList + = rawStore.getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, null); + return ((writeEventInfoList == null) + ? null + : new ArrayList<>(Collections2.filter(writeEventInfoList, + writeEventInfo -> { + assert(writeEventInfo != null); + return withinContext.replScope.tableIncludedInReplScope(writeEventInfo.getTable()); + }))); + } + @Override public void handle(Context withinContext) throws Exception { LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), eventMessageAsJSON); @@ -125,15 +141,12 @@ public void handle(Context withinContext) throws Exception { "not dumping acid tables."); replicatingAcidEvents = false; } - String contextDbName = withinContext.dbName == null ? null : - StringUtils.normalizeIdentifier(withinContext.dbName); - String contextTableName = withinContext.tableName == null ? null : - StringUtils.normalizeIdentifier(withinContext.tableName); + List writeEventInfoList = null; if (replicatingAcidEvents) { - writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf). - getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName); + writeEventInfoList = getAllWriteEventInfo(withinContext); } + int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0); if (numEntry != 0) { eventMessage.addWriteEventInfo(writeEventInfoList); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index ec35f4e94d..0d60c3136d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; - import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -40,18 +40,16 @@ final Hive db; final HiveConf hiveConf; final ReplicationSpec replicationSpec; - final String dbName; - final String tableName; + final ReplScope replScope; public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, - ReplicationSpec replicationSpec, String dbName, String tableName) { + ReplicationSpec replicationSpec, ReplScope replScope) { this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; this.hiveConf = hiveConf; this.replicationSpec = replicationSpec; - this.dbName = dbName; - this.tableName = tableName; + this.replScope = replScope; } public Context(Context other) { @@ -60,11 +58,10 @@ public Context(Context other) { this.db = other.db; this.hiveConf = other.hiveConf; this.replicationSpec = other.replicationSpec; - this.dbName = other.dbName; - this.tableName = other.tableName; + this.replScope = other.replScope; } - public void setEventRoot(Path eventRoot) { + void setEventRoot(Path eventRoot) { this.eventRoot = eventRoot; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 7a58dba877..a412d435fd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -39,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; @@ -103,13 +103,14 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiv public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throws Exception { List tableList = Arrays.asList("a1", "a2"); String dbRandomKey = "akeytoberandom"; + ReplScope replScope = new ReplScope("default"); mockStatic(Utils.class); when(Utils.matchesDb(same(hive), eq("default"))) .thenReturn(Collections.singletonList("default")); - when(Utils.getAllTables(same(hive), eq("default"))).thenReturn(tableList); + when(Utils.getAllTables(same(hive), eq("default"), eq(replScope))).thenReturn(tableList); when(Utils.setDbBootstrapDumpState(same(hive), eq("default"))).thenReturn(dbRandomKey); - when(Utils.matchesTbl(same(hive), eq("default"), anyString())).thenReturn(tableList); + when(Utils.matchesTbl(same(hive), eq("default"), eq(replScope))).thenReturn(tableList); when(hive.getAllFunctions()).thenReturn(Collections.emptyList()); @@ -138,7 +139,7 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, task.initialize(queryState, null, null, null); task.setWork( - new ReplDumpWork("default", "", + new ReplDumpWork(replScope, Long.MAX_VALUE, Long.MAX_VALUE, "", Integer.MAX_VALUE, "") ); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java similarity index 96% rename from standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java index e25189d35d..c2c8e4b20f 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.common; +package org.apache.hadoop.hive.common.repl; /** * A class that defines the constant strings used by the replication implementation. diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java new file mode 100644 index 0000000000..5c5e54aa10 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.repl; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Class that stores the replication scope. Replication scope includes the details of database and + * tables included under the scope of replication. + */ +public class ReplScope { + private String dbName; + private String tableName; // Only for REPL LOAD and REPL STATUS. + + private Pattern dbNamePattern; + private List includedTableNamePatterns; // Only for REPL DUMP + private List excludedTableNamePatterns; // Only for REPL DUMP + + public ReplScope() { + } + + public ReplScope(String dbName) { + setDbName(dbName); + } + + public void setDbName(String dbName) { + this.dbName = dbName; + this.dbNamePattern = (((dbName == null) || dbName.equals("*")) + ? null : Pattern.compile(dbName, Pattern.CASE_INSENSITIVE)); + } + + public String getDbName() { + return dbName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getTableName() { + return tableName; + } + + public void setIncludedTablePatterns(List includedTableNamePatterns) { + this.includedTableNamePatterns = compilePatterns(includedTableNamePatterns); + } + + public void setExcludedTablePatterns(List excludedTableNamePatterns) { + this.excludedTableNamePatterns = compilePatterns(excludedTableNamePatterns); + } + + public boolean includedInReplScope(final String dbName, final String tableName) { + return dbIncludedInReplScope(dbName) && tableIncludedInReplScope(tableName); + } + + public boolean dbIncludedInReplScope(final String dbName) { + return (dbNamePattern == null) || dbNamePattern.matcher(dbName).matches(); + } + + public boolean tableIncludedInReplScope(final String tableName) { + return (inTableIncludedList(tableName) && !inTableExcludedList(tableName)); + } + + private List compilePatterns(List patterns) { + if (patterns == null || patterns.isEmpty()) { + return null; + } + List compiledPatterns = new ArrayList<>(); + for (String pattern : patterns) { + // Convert the pattern to lower case because events/HMS will have table names in lower case. + compiledPatterns.add(Pattern.compile(pattern, Pattern.CASE_INSENSITIVE)); + } + return compiledPatterns; + } + + private boolean tableMatchAnyPattern(final String tableName, List tableNamePatterns) { + assert(tableNamePatterns != null); + for (Pattern tableNamePattern : tableNamePatterns) { + if (tableNamePattern.matcher(tableName).matches()) { + return true; + } + } + return false; + } + + private boolean inTableIncludedList(final String tableName) { + if (includedTableNamePatterns == null) { + // If included list is empty, repl policy should be db.[].[a,b]. It is equivalent to db.*.[a,b] + // So, all tables must be accepted. + return true; + } + return tableMatchAnyPattern(tableName, includedTableNamePatterns); + } + + private boolean inTableExcludedList(final String tableName) { + if (excludedTableNamePatterns == null) { + // If excluded list is empty means, all included tables must be accepted. + return false; + } + return tableMatchAnyPattern(tableName, excludedTableNamePatterns); + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index aeaa0ec080..1249058325 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -21,7 +21,7 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.common.ReplConst; +import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java index 84302d62bf..b0347ce5c3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; public abstract class BasicFilter implements NotificationFilter { @Override @@ -29,5 +30,11 @@ public boolean accept(final NotificationEvent event) { return shouldAccept(event); } + boolean isTxnRelatedEvent(final NotificationEvent event) { + return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) || + (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) || + (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT))); + } + abstract boolean shouldAccept(final NotificationEvent event); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java index 712c12c895..fc4c9caa86 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.metastore.messaging.event.filters; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; import java.util.regex.Pattern; @@ -40,12 +39,6 @@ public DatabaseAndTableFilter(final String databaseNameOrPattern, final String t this.tableName = tableName; } - private boolean isTxnRelatedEvent(final NotificationEvent event) { - return ((event.getEventType().equals(MessageBuilder.OPEN_TXN_EVENT)) || - (event.getEventType().equals(MessageBuilder.COMMIT_TXN_EVENT)) || - (event.getEventType().equals(MessageBuilder.ABORT_TXN_EVENT))); - } - @Override boolean shouldAccept(final NotificationEvent event) { if ((dbPattern == null) || isTxnRelatedEvent(event)) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java new file mode 100644 index 0000000000..e6a3e5cd7e --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/ReplEventFilter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.common.repl.ReplScope; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +/** + * Utility function that constructs a notification filter to check if table is accepted for replication. + */ +public class ReplEventFilter extends BasicFilter { + private final ReplScope replScope; + + public ReplEventFilter(final ReplScope replScope) { + this.replScope = replScope; + } + + @Override + boolean shouldAccept(final NotificationEvent event) { + // All txn related events are global ones and should be always accepted. + // For other events, if the DB/table names are included as per replication scope, then should + // accept the event. + return (isTxnRelatedEvent(event) + || replScope.includedInReplScope(event.getDbName(), event.getTableName())); + } +}