diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestPartitionLevelReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestPartitionLevelReplication.java new file mode 100644 index 0000000000..2956678edd --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestPartitionLevelReplication.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.hadoop.hive.shims.Utils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +/** + * TestPartitionLevelReplication - test partition level replication. + */ +public class TestPartitionLevelReplication extends BaseReplicationScenariosAcidTables { + private static String basicFilter = " 't0' where key0 < 5 and key0 > 0 or key0 = 100, 't1*' where key1 = 1, " + + " '(t[2]+[0-9]*)|(t3)' where key2 > 2, 't4' where key4 != 1"; + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Map overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestPartitionLevelReplication.class); + } + + static void internalBeforeClassSetup(Map overrides, + Class clazz) throws Exception { + + conf = new HiveConf(clazz); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + Map acidEnableConf = new HashMap() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.in.repl.test", "true"); + }}; + + acidEnableConf.putAll(overrides); + + primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + } + + @Before + public void setup() throws Throwable { + super.setup(); + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + + private void createTables(String txnProperty) throws Throwable { + if (txnProperty == null) { + txnProperty = "'transactional'='true'"; + } + String tableProperty = "STORED AS ORC TBLPROPERTIES ( " + txnProperty + ")"; + primary.run("use " + primaryDbName) + .run("CREATE TABLE t0(a int) partitioned by (key0 int) " + tableProperty) + .run("CREATE TABLE t1(a int) partitioned by (key1 int) " + tableProperty) + .run("CREATE TABLE t13(a int) partitioned by (key1 int) " + tableProperty) + .run("CREATE TABLE t2(a int) partitioned by (key2 int) " + tableProperty) + .run("CREATE TABLE t23(a int) partitioned by (key2 int) " + tableProperty) + .run("CREATE TABLE t3(a int) partitioned by (key2 int) " + tableProperty) + .run("CREATE TABLE t4(a int) partitioned by (key4 int) " + tableProperty) + .run("CREATE TABLE t5(a int) partitioned by (key5 int) " + tableProperty) + .run("CREATE TABLE t6(a int) " + tableProperty); + } + + private void insertRecords(boolean isMultiStmtTxn) throws Throwable { + String txnStrStart = "use " + primaryDbName; + String txnStrCommit = "use " + primaryDbName; + if (isMultiStmtTxn) { + txnStrStart = "START TRANSACTION"; + txnStrCommit = "COMMIT"; + } + + primary.run("use " + primaryDbName).run(txnStrStart) + .run("INSERT INTO t0 partition (key0 = 1) values (1) ") + .run("INSERT INTO t0 partition (key0 = 2) values (2) ") + .run("INSERT INTO t0 partition (key0 = 3) values (3) ") + .run("INSERT INTO t0 partition (key0 = 5) values (5) ") + .run("INSERT INTO t0 partition (key0 = 100) values (100) ") + + .run("INSERT INTO t1 partition (key1 = 1) values (1) ") + .run("INSERT INTO t1 partition (key1 = 2) values (2) ") + .run("INSERT INTO t13 partition (key1 = 1) values (1) ") + + .run("INSERT INTO t2 partition (key2 = 3) values (3) ") + .run("INSERT INTO t23 partition (key2 = 3) values (3) ") + .run("INSERT INTO t2 partition (key2 = 2) values (2) ") + .run("INSERT INTO t23 partition (key2 = 1) values (4) ") + .run("INSERT INTO t23 partition (key2 = 4) values (5) ") + .run("INSERT INTO t3 partition (key2 = 3) values (3) ") + + .run("INSERT INTO t4 partition (key4 = 1) values (3) ") + .run("INSERT INTO t5 partition (key5 = 1) values (3) ") + .run("INSERT INTO t6 values (3) ") + .run(txnStrCommit); + } + + private void verifyTableContent() throws Throwable { + //For table t0, partition with value 5 is not satisfying the filter condition, thus not replicated. + replica.run("use " + replicatedDbName) + .run("SELECT a from t0 order by a") + .verifyResults(new String[] {"1", "2", "3", "100"}) + + //For t1*, both t1 and t13 are filtered + .run("SELECT a from t1") + .verifyResults(new String[] {"1"}) + .run("SELECT a from t13") + .verifyResults(new String[] {"1"}) + + //For [t2*,t30], t2, t23 and t3 are filtered. + .run("SELECT a from t2") + .verifyResults(new String[] {"3"}) + .run("SELECT a from t23") + .verifyResults(new String[] {"3", "5"}) + .run("SELECT a from t3") + .verifyResults(new String[] {"3"}) + + //For t4, none of the partition satisfies the filter condition. + .run("SELECT a from t4") + .verifyResults(new String[] {}) + + //t5 and t6 are not part of the filter string, thus are not filtered. + .run("SELECT a from t5") + .verifyResults(new String[] {"3"}) + .run("SELECT a from t6") + .verifyResults(new String[] {"3"}); + } + + @Test + public void testPartLevelReplictionBootstrapAcidTable() throws Throwable { + createTables(null); + insertRecords(false); + WarehouseInstance.Tuple bootStrapDump = + primary.dump(primaryDbName, null, null, null, basicFilter); + replica.loadWithoutExplain(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + verifyTableContent(); + } + + @Test + public void testPartLevelReplictionBootstrapNonAcidTable() throws Throwable { + createTables("'transactional'='false'"); + insertRecords(false); + WarehouseInstance.Tuple bootStrapDump = + primary.dump(primaryDbName, null, null, null, basicFilter); + replica.loadWithoutExplain(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + verifyTableContent(); + } + + @Test + public void testPartLevelReplictionBootstrapTableFilter() throws Throwable { + createTables(null); + insertRecords(false); + WarehouseInstance.Tuple bootStrapDump = + primary.dump(primaryDbName + ".'t0|t1'", null, null, + null, basicFilter); + replica.loadWithoutExplain(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"t0", "t1"}) + .run("SELECT a from t0 order by a") + .verifyResults(new String[] {"1", "2", "3", "100"}) + .run("SELECT a from t1") + .verifyResults(new String[] {"1"}); + } + + @Test + public void testPartLevelReplictionBootstrapDateTypeField() throws Throwable { + String filter = "'acid_table' where year(dt) >= 2019 and month(dt) >= 6"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into acid_table partition(dt='1970-01-01') values (1)") + .run("insert into acid_table partition(dt='2019-06-01') values (2)") + .run("insert into acid_table partition(dt='2018-12-01') values (3)") + .run("insert into acid_table partition(dt='2019-01-01') values (4)") + .dump(primaryDbName, null, null, null, filter); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from acid_table") + .verifyResults(new String[] {"2"}); + } + + @Test + public void testPartLevelReplictionBootstrapPatterns() throws Throwable { + String filter = "'(a[0-9]+)|(b)' where (year(dt) >= 2019 and month(dt) >= 6) or (year(dt) == 1947)"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table a1 (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into a1 partition(dt='1970-01-01') values (1)") + .run("insert into a1 partition(dt='2019-05-01') values (2)") + .run("insert into a1 partition(dt='2019-07-01') values (3)") + .run("insert into a1 partition(dt='1900-08-01') values (4)") + .run("create table b (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into b partition(dt='1983-01-01') values (1)") + .run("insert into b partition(dt='1947-05-01') values (2)") + .run("insert into b partition(dt='1947-07-01') values (3)") + .run("create table b1 (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into b1 partition(dt='1983-01-01') values (1)") + .run("insert into b1 partition(dt='1947-05-01') values (2)") + .run("insert into b1 partition(dt='2019-06-01') values (3)") + .dump(primaryDbName, null, null, null, filter); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from a1") // a1 is filtered as per 'a[0-9]+' + .verifyResults(new String[] {"3"}) + .run("select a from b") // b is filtered as per 'b' + .verifyResults(new String[] {"2", "3"}) + .run("select a from b1") // b1 is not filtered + .verifyResults(new String[] {"1", "2", "3"}); + } + + @Test + public void testPartLevelReplictionBootstrapExternalTable() throws Throwable { + List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + String filter = "'(a[0-9]+)|(b)' where (year(dt) >= 2019 and month(dt) >= 6) or (year(dt) == 1947)"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table a1 (a int) partitioned by (dt string)") + .run("insert into a1 partition(dt='1970-01-01') values (1)") + .run("insert into a1 partition(dt='1947-05-01') values (2)") + .run("insert into a1 partition(dt='2019-07-01') values (3)") + .run("insert into a1 partition(dt='1900-08-01') values (4)") + .dump(primaryDbName, null, null, dumpWithClause, filter); + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select a from a1") + .verifyResults(new String[] {"2", "3"}); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 6326bc34f2..ec9791aaca 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -255,6 +255,11 @@ WarehouseInstance runFailure(String command, int errorCode) throws Throwable { return this; } + Tuple dump(String replPolicy, String oldReplPolicy, String lastReplicationId, List withClauseOptions) + throws Throwable { + return dump(replPolicy, oldReplPolicy, lastReplicationId, withClauseOptions, null); + } + Tuple dump(String dbName, String lastReplicationId, List withClauseOptions) throws Throwable { String dumpCommand = @@ -265,15 +270,19 @@ Tuple dump(String dbName, String lastReplicationId, List withClauseOptio return dump(dumpCommand); } - Tuple dump(String replPolicy, String oldReplPolicy, String lastReplicationId, List withClauseOptions) - throws Throwable { + Tuple dump(String replPolicy, String oldReplPolicy, String lastReplicationId, List withClauseOptions, + String partitionFilter) throws Throwable { String dumpCommand = "REPL DUMP " + replPolicy + (oldReplPolicy == null ? "" : " REPLACE " + oldReplPolicy) + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); - if (!withClauseOptions.isEmpty()) { + if (withClauseOptions != null && !withClauseOptions.isEmpty()) { dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + ")"; } + + if (partitionFilter != null) { + dumpCommand += " where " + partitionFilter; + } return dump(dumpCommand); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index ae9d040ef8..f666df23c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -53,7 +53,7 @@ public int execute(DriverContext driverContext) { LOG.debug("Exporting data to: {}", exportPaths.exportRootDir()); work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), - work.getReplicationSpec(), db, null, conf, work.getMmContext()); + work.getReplicationSpec(), db, null, conf, work.getMmContext(), null); if (!tableExport.write()) { throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 4cd60cc6ee..467ed14db4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -495,7 +495,6 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); Table table = null; - try { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf); table = tableTuple != null ? tableTuple.object : null; @@ -580,8 +579,8 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId)); } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); - new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); + new TableExport(exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, + mmCtx, work.replScope).write(); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java index e635670932..1aed4c5a78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java @@ -18,6 +18,13 @@ package org.apache.hadoop.hive.ql.metadata; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -45,6 +52,7 @@ * IllegalStateException. */ public class PartitionIterable implements Iterable { + private static final Logger LOG = LoggerFactory.getLogger(PartitionIterable.class); @Override public Iterator iterator() { @@ -56,10 +64,26 @@ private Iterator partitionNamesIter = null; private Iterator batchIter = null; + private Iterator partitionFilterIter; + private void initialize(){ if(!initialized){ if (currType == Type.LIST_PROVIDED){ ptnsIterator = ptnsProvided.iterator(); + } else if (currType == Type.FILTER_PROVIDED) { + try { + List result = new ArrayList<>(); + boolean hasUnknown = db.getPartitionsByExpr(table, + (ExprNodeGenericFuncDesc)partitionFilter, db.getConf(), result); + if (hasUnknown) { + throw new SemanticException( + "Unexpected unknown partitions for " + partitionFilter.getExprString()); + } + partitionFilterIter = result.iterator(); + } catch (Exception e) { + LOG.error("Failed to extract partitions for " + table.getDbName() + "." + table.getTableName(), e); + throw new RuntimeException(e.getMessage()); + } } else { partitionNamesIter = partitionNames.iterator(); } @@ -72,6 +96,8 @@ public boolean hasNext() { initialize(); if (currType == Type.LIST_PROVIDED){ return ptnsIterator.hasNext(); + } else if (currType == Type.FILTER_PROVIDED) { + return partitionFilterIter == null ? false : partitionFilterIter.hasNext(); } else { return ((batchIter != null) && batchIter.hasNext()) || partitionNamesIter.hasNext(); } @@ -84,6 +110,10 @@ public Partition next() { return ptnsIterator.next(); } + if (currType == Type.FILTER_PROVIDED) { + return partitionFilterIter.next(); + } + if ((batchIter == null) || !batchIter.hasNext()){ getNextBatch(); } @@ -115,7 +145,8 @@ public void remove() { enum Type { LIST_PROVIDED, // Where a List partitionNames = null; private int batch_size; private boolean getColStats = false; + ExprNodeDesc partitionFilter; + + public PartitionIterable(Hive db, Table tbl, ExprNodeDesc filter) { + this.currType = Type.FILTER_PROVIDED; + this.table = tbl; + this.partitionFilter = filter; + this.db = db; + } /** * Dummy constructor, which simply acts as an iterator on an already-present diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 4a366a9360..47dc85f603 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -108,7 +108,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // Note: this tableExport is actually never used other than for auth, and another one is // created when the task is executed. So, we don't care about the correct MM state here. TableExport.AuthEntities authEntities = new TableExport( - exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities(); + exportPaths, ts, replicationSpec, db, null, conf, null, null).getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); String exportRootDirName = tmpPath; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 73f1804e8c..be173c8ea6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -398,6 +398,8 @@ TOK_REPL_CONFIG; TOK_REPL_CONFIG_LIST; TOK_REPL_TABLES; TOK_REPL_TABLES_LIST; +TOK_REPL_COND; +TOK_REPL_COND_LIST; TOK_TO; TOK_ONLY; TOK_SUMMARY; @@ -904,7 +906,8 @@ replDumpStatement (KW_LIMIT (batchSize=Number))? )? (KW_WITH replConf=replConfigs)? - -> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?) + (KW_WHERE tblwh=tableWhereClauseList)? + -> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf? $tblwh?) ; replDbPolicy @@ -912,6 +915,20 @@ replDbPolicy @after { popMsg(state); } : (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? -> $dbName $tablePolicy? + ; + +tableWhereClauseList +@init { pushMsg("Table where clause List for partition filter", state); } +@after { popMsg(state); } + : + tableWhereClause (COMMA tableWhereClause)* -> ^(TOK_REPL_COND_LIST tableWhereClause+) + ; + +tableWhereClause +@init { pushMsg("Where clasue for each table name pattern", state); } +@after { popMsg(state); } + : + tblName=StringLiteral wh=whereClause -> ^(TOK_REPL_COND $tblName $wh) ; replLoadStatement diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 39789ca22f..22317d7305 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -65,6 +65,11 @@ import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_TABLES; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_WHERE; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABREF; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABLE_OR_COL; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_COND_LIST; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TABNAME; public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Replication Scope @@ -180,6 +185,22 @@ private void setOldReplPolicy(Tree oldReplPolicyTree) throws HiveException { setReplDumpTablesList(oldPolicyTablesListNode, oldReplScope); } + private void extractPartitionFilter(Tree condNode, ReplScope replScope) throws Exception { + for (int i = 0; i < condNode.getChildCount(); i++) { + Tree node = condNode.getChild(i); + // Table pattern node + String tablePattern = unescapeSQLString(node.getChild(0).getText()); + if (tablePattern == null || tablePattern.isEmpty()) { + throw new SemanticException(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN); + } + + // Filter Node + Tree filter = node.getChild(1).getChild(0); + LOG.info("Adding filter " + filter.toStringTree() + " to repl scope for pattern list " + tablePattern); + replScope.setPartFilter(tablePattern, filter); + } + } + private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); boolean isMetaDataOnly = false; @@ -236,6 +257,15 @@ private void initReplDump(ASTNode ast) throws HiveException { } break; } + case TOK_REPL_COND_LIST: { + try { + extractPartitionFilter(currNode, replScope); + } catch (Exception e) { + LOG.error("Failed to extract partition filter.", e); + throw new HiveException(e.getMessage()); + } + break; + } default: { throw new SemanticException("Unrecognized token " + currNode.getType() + " in REPL DUMP statement."); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index d47457857c..aed3d98dbb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -11368,6 +11368,20 @@ protected String getAliasId(String alias, QB qb) { return (qb.getId() == null ? alias : qb.getId() + ":" + alias).toLowerCase(); } + public static RowResolver getRowResolverFromTable(Table tbl) { + RowResolver rwsch = new RowResolver(); + String alias = tbl.getTableName(); + for (FieldSchema cols : tbl.getAllCols()) { + rwsch.put(alias, cols.getName(), new ColumnInfo(cols.getName(), + TypeInfoFactory.getPrimitiveTypeInfo(cols.getType()), alias, false)); + } + for (FieldSchema part_col : tbl.getPartCols()) { + rwsch.put(alias, part_col.getName(), new ColumnInfo(part_col.getName(), + TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()), alias, true)); + } + return rwsch; + } + @SuppressWarnings("nls") private Operator genTablePlan(String alias, QB qb) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 56850417dd..9e091f044b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -30,12 +31,17 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.TypeCheckCtx; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,9 +69,10 @@ private final HiveConf conf; private final Paths paths; private final MmContext mmCtx; + private ExprNodeDesc partitionFilter = null; public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, - String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { + String distCpDoAsUser, HiveConf conf, MmContext mmCtx, ReplScope replScope) throws SemanticException { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) @@ -83,6 +90,17 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.conf = conf; this.paths = paths; this.mmCtx = mmCtx; + if (replScope != null) { + ASTNode filterNode = (ASTNode) replScope.getPartFilter(tableSpec.tableHandle.getTableName()); + if (filterNode == null) { + logger.info("Partition filter is not set for table " + tableSpec.tableHandle.getTableName()); + } else { + partitionFilter = TypeCheckProcFactory.genExprNode(filterNode, + new TypeCheckCtx(SemanticAnalyzer.getRowResolverFromTable(tableSpec.tableHandle))).get(filterNode); + logger.info("Partition filter " + filterNode.toStringTree() + " is set for table " + + tableSpec.tableHandle.getTableName()); + } + } } public boolean write() throws SemanticException { @@ -108,6 +126,9 @@ private PartitionIterable getPartitions() throws SemanticException { if (replicationSpec.isMetadataOnly()) { return null; } else { + if (partitionFilter != null) { + return new PartitionIterable(db, tableSpec.tableHandle, partitionFilter); + } return new PartitionIterable(db, tableSpec.tableHandle, null, conf.getIntVar( HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX), true); } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java index 8b26f1ce40..de8161b9fe 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hive.common.repl; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.regex.Pattern; /** @@ -33,6 +37,7 @@ private String excludedTableNames; private Pattern includedTableNamePattern; private Pattern excludedTableNamePattern; + private Map partFilter = new HashMap<>(); public ReplScope() { } @@ -116,4 +121,23 @@ private boolean inTableExcludedList(final String tableName) { } return excludedTableNamePattern.matcher(tableName).matches(); } + + public Object getPartFilter(String tableName) { + Object filter = null; + // The number of patterns are not expected to be huge, so iterating the whole list once per table should not + // be an issue. + for (Pattern pattern : partFilter.keySet()) { + if (pattern.matcher(tableName).matches()) { + if (filter != null) { + throw new RuntimeException("Table " + tableName + " is matching multiple partition filter patterns."); + } + filter = partFilter.get(pattern); + } + } + return filter; + } + + public void setPartFilter(String patternString, Object filter) { + partFilter.put(Pattern.compile(patternString, Pattern.CASE_INSENSITIVE), filter); + } }