diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestPartitionLevelReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestPartitionLevelReplication.java new file mode 100644 index 0000000000..bdedf5fdcb --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestPartitionLevelReplication.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.hadoop.hive.shims.Utils; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +/** + * TestPartitionLevelReplication - test partition level replication. + */ +public class TestPartitionLevelReplication extends BaseReplicationScenariosAcidTables { + private String basicFilter = " PARTITIONS 't0' where key0 < 5 and key0 > 0 or key0 = 100, 't1*' where key1 = 1, " + + " '(t[2]+[0-9]*)|(t3)' where key2 > 2, 't4' where key4 != 1"; + private String basicReplPolicy = null; + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Map overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestPartitionLevelReplication.class); + } + + static void internalBeforeClassSetup(Map overrides, + Class clazz) throws Exception { + + conf = new HiveConf(clazz); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + Map acidEnableConf = new HashMap() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.in.repl.test", "true"); + }}; + + acidEnableConf.putAll(overrides); + + primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf); + } + + @Before + public void setup() throws Throwable { + super.setup(); + basicReplPolicy = primaryDbName + basicFilter; + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + + private void createTables(String txnProperty) throws Throwable { + if (txnProperty == null) { + txnProperty = "'transactional'='true'"; + } + String tableProperty = "STORED AS ORC TBLPROPERTIES ( " + txnProperty + ")"; + primary.run("use " + primaryDbName) + .run("CREATE TABLE t0(a int) partitioned by (key0 int) " + tableProperty) + .run("CREATE TABLE t1(a int) partitioned by (key1 int) " + tableProperty) + .run("CREATE TABLE t13(a int) partitioned by (key1 int) " + tableProperty) + .run("CREATE TABLE t2(a int) partitioned by (key2 int) " + tableProperty) + .run("CREATE TABLE t23(a int) partitioned by (key2 int) " + tableProperty) + .run("CREATE TABLE t3(a int) partitioned by (key2 int) " + tableProperty) + .run("CREATE TABLE t4(a int) partitioned by (key4 int) " + tableProperty) + .run("CREATE TABLE t5(a int) partitioned by (key5 int) " + tableProperty) + .run("CREATE TABLE t6(a int) " + tableProperty); + } + + private void insertRecords(boolean isMultiStmtTxn) throws Throwable { + String txnStrStart = "use " + primaryDbName; + String txnStrCommit = "use " + primaryDbName; + if (isMultiStmtTxn) { + txnStrStart = "START TRANSACTION"; + txnStrCommit = "COMMIT"; + } + + primary.run("use " + primaryDbName).run(txnStrStart) + .run("INSERT INTO t0 partition (key0 = 1) values (1) ") + .run("INSERT INTO t0 partition (key0 = 2) values (2) ") + .run("INSERT INTO t0 partition (key0 = 3) values (3) ") + .run("INSERT INTO t0 partition (key0 = 5) values (5) ") + .run("INSERT INTO t0 partition (key0 = 100) values (100) ") + + .run("INSERT INTO t1 partition (key1 = 1) values (1) ") + .run("INSERT INTO t1 partition (key1 = 2) values (2) ") + .run("INSERT INTO t13 partition (key1 = 1) values (1) ") + + .run("INSERT INTO t2 partition (key2 = 3) values (3) ") + .run("INSERT INTO t23 partition (key2 = 3) values (3) ") + .run("INSERT INTO t2 partition (key2 = 2) values (2) ") + .run("INSERT INTO t23 partition (key2 = 1) values (4) ") + .run("INSERT INTO t23 partition (key2 = 4) values (5) ") + .run("INSERT INTO t3 partition (key2 = 3) values (3) ") + + .run("INSERT INTO t4 partition (key4 = 1) values (3) ") + .run("INSERT INTO t5 partition (key5 = 1) values (3) ") + .run("INSERT INTO t6 values (3) ") + .run(txnStrCommit); + } + + private void verifyTableContent() throws Throwable { + //For table t0, partition with value 5 is not satisfying the filter condition, thus not replicated. + replica.run("use " + replicatedDbName) + .run("SELECT a from t0 order by a") + .verifyResults(new String[] {"1", "2", "3", "100"}) + + //For t1*, both t1 and t13 are filtered + .run("SELECT a from t1") + .verifyResults(new String[] {"1"}) + .run("SELECT a from t13") + .verifyResults(new String[] {"1"}) + + //For [t2*,t30], t2, t23 and t3 are filtered. + .run("SELECT a from t2") + .verifyResults(new String[] {"3"}) + .run("SELECT a from t23") + .verifyResults(new String[] {"3", "5"}) + .run("SELECT a from t3") + .verifyResults(new String[] {"3"}) + + //For t4, none of the partition satisfies the filter condition. + .run("SELECT a from t4") + .verifyResults(new String[] {}) + + //t5 and t6 are not part of the filter string, thus are not filtered. + .run("SELECT a from t5") + .verifyResults(new String[] {"3"}) + .run("SELECT a from t6") + .verifyResults(new String[] {"3"}); + } + + @Test + public void testPartLevelReplictionBootstrapAcidTable() throws Throwable { + createTables(null); + insertRecords(false); + WarehouseInstance.Tuple bootStrapDump = + primary.dump(basicReplPolicy, null); + replica.loadWithoutExplain(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + verifyTableContent(); + } + + @Test + public void testPartLevelReplictionBootstrapNonAcidTable() throws Throwable { + createTables("'transactional'='false'"); + insertRecords(false); + WarehouseInstance.Tuple bootStrapDump = + primary.dump(basicReplPolicy, null); + replica.loadWithoutExplain(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + verifyTableContent(); + } + + @Test + public void testPartLevelReplictionBootstrapTableFilter() throws Throwable { + createTables(null); + insertRecords(false); + WarehouseInstance.Tuple bootStrapDump = + primary.dump(primaryDbName + ".'t0|t1'" + basicFilter, null); + replica.loadWithoutExplain(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"t0", "t1"}) + .run("SELECT a from t0 order by a") + .verifyResults(new String[] {"1", "2", "3", "100"}) + .run("SELECT a from t1") + .verifyResults(new String[] {"1"}); + } + + @Test + public void testPartLevelReplictionBootstrapDateTypeField() throws Throwable { + String filter = " PARTITIONS 'acid_table' where year(dt) >= 2019 and month(dt) >= 6"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into acid_table partition(dt='1970-01-01') values (1)") + .run("insert into acid_table partition(dt='2019-06-01') values (2)") + .run("insert into acid_table partition(dt='2018-12-01') values (3)") + .run("insert into acid_table partition(dt='2019-01-01') values (4)") + .dump(primaryDbName + filter, null); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from acid_table") + .verifyResults(new String[] {"2"}); + } + + @Test + public void testPartLevelReplictionBootstrapPatterns() throws Throwable { + String filter = " PARTITIONS '(a[0-9]+)|(b)' where (year(dt) >= 2019 and month(dt) >= 6) or (year(dt) == 1947)"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table a1 (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into a1 partition(dt='1970-01-01') values (1)") + .run("insert into a1 partition(dt='2019-05-01') values (2)") + .run("insert into a1 partition(dt='2019-07-01') values (3)") + .run("insert into a1 partition(dt='1900-08-01') values (4)") + .run("create table b (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into b partition(dt='1983-01-01') values (1)") + .run("insert into b partition(dt='1947-05-01') values (2)") + .run("insert into b partition(dt='1947-07-01') values (3)") + .run("create table b1 (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into b1 partition(dt='1983-01-01') values (1)") + .run("insert into b1 partition(dt='1947-05-01') values (2)") + .run("insert into b1 partition(dt='2019-06-01') values (3)") + .dump(primaryDbName + filter, null); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from a1") // a1 is filtered as per 'a[0-9]+' + .verifyResults(new String[] {"3"}) + .run("select a from b") // b is filtered as per 'b' + .verifyResults(new String[] {"2", "3"}) + .run("select a from b1") // b1 is not filtered + .verifyResults(new String[] {"1", "2", "3"}); + } + + @Test + public void testPartLevelReplictionBootstrapExternalTable() throws Throwable { + List loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + List dumpWithClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'" + ); + String filter = " PARTITIONS '(a[0-9]+)|(b)' where (year(dt) >= 2019 and month(dt) >= 6) or (year(dt) == 1947)"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table a1 (a int) partitioned by (dt string)") + .run("insert into a1 partition(dt='1970-01-01') values (1)") + .run("insert into a1 partition(dt='1947-05-01') values (2)") + .run("insert into a1 partition(dt='2019-07-01') values (3)") + .run("insert into a1 partition(dt='1900-08-01') values (4)") + .dump(primaryDbName + filter, null, null, dumpWithClause); + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select a from a1") + .verifyResults(new String[] {"2", "3"}); + } + + @Test + public void testPartLevelReplictionIncrAcidTable() throws Throwable { + createTables(null); + WarehouseInstance.Tuple dump = + primary.dump(basicReplPolicy, null); + replica.loadWithoutExplain(replicatedDbName, dump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(dump.lastReplicationId); + insertRecords(false); + dump = primary.dump(basicReplPolicy, dump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, dump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(dump.lastReplicationId); + verifyTableContent(); + } + + @Test + public void testPartLevelReplictionIncrNonAcidTable() throws Throwable { + createTables("'transactional'='false'"); + WarehouseInstance.Tuple dump = + primary.dump(basicReplPolicy, null); + replica.loadWithoutExplain(replicatedDbName, dump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(dump.lastReplicationId); + insertRecords(false); + dump = primary.dump(basicReplPolicy, dump.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, dump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(dump.lastReplicationId); + verifyTableContent(); + } + + @Test + public void testPartLevelReplictionIncrPatterns() throws Throwable { + String filter = " PARTITIONS '(a[0-9]+)|(b)' where (year(dt) >= 2019 and month(dt) >= 6) or (year(dt) == 1947)"; + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName + filter, null); + replica.load(replicatedDbName, tuple.dumpLocation); + tuple = primary.run("use " + primaryDbName) + .run("create table a1 (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into a1 partition(dt='1970-01-01') values (1)") + .run("insert into a1 partition(dt='2019-05-01') values (2)") + .run("insert into a1 partition(dt='2019-07-01') values (3)") + .run("insert into a1 partition(dt='1900-08-01') values (4)") + .run("create table b (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into b partition(dt='1983-01-01') values (1)") + .run("insert into b partition(dt='1947-05-01') values (2)") + .run("insert into b partition(dt='1947-07-01') values (3)") + .run("create table b1 (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into b1 partition(dt='1983-01-01') values (1)") + .run("insert into b1 partition(dt='1947-05-01') values (2)") + .run("insert into b1 partition(dt='2019-06-01') values (3)") + .dump(primaryDbName + filter, tuple.lastReplicationId); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from a1") // a1 is filtered as per 'a[0-9]+' + .verifyResults(new String[] {"3"}) + .run("select a from b") // b is filtered as per 'b' + .verifyResults(new String[] {"2", "3"}) + .run("select a from b1") // b1 is not filtered + .verifyResults(new String[] {"1", "2", "3"}); + } + + @Test + public void testPartLevelReplictionMultiStmtTxn() throws Throwable { + String tableProperty = "'transactional'='true'"; + createTables(tableProperty); + WarehouseInstance.Tuple tuple = + primary.dump(basicReplPolicy, null); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(tuple.lastReplicationId); + insertRecords(true); + tuple = primary.dump(basicReplPolicy, tuple.lastReplicationId); + replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(tuple.lastReplicationId); + verifyTableContent(); + } + + @Test + public void testPartLevelReplictionFailure() throws Throwable { + String replPolicy; + createTables("'transactional'='true'"); + + // t4 does not have key2 as partition column, so repl dump should fail. + // String replPolicy = primaryDbName + " PARTITIONS " + " 't0' where key0 < 5 and key0 > 0 or key0 = 100," + + // " 't1*' where key1 = 1, '(t2*)|(t3)|(t4)' where key2 > 2"; + // primary.dumpFailure(replPolicy, null); + + // t0 is added twice in filter condition, so repl dump should fail. + replPolicy = primaryDbName + " PARTITIONS " + " t0 where key0 < 5 and key0 > 0 or key0 = 100, t1* where key1 = 1," + + " [t2*,t3] where key2 > 2, t0 where key0 != 5"; + primary.dumpFailure(replPolicy, null); + + // Even if t7 is not a valid table, dump will not fail for this. + replPolicy = primaryDbName + " PARTITIONS " + " 't7' where key > 1"; + primary.dump(replPolicy, null); + } + + @Test + public void testPartLevelReplictionWithClause() throws Throwable { + String filter = "'acid_table' where key0 > 10"; + String replPolicy = primaryDbName + " PARTITIONS " + filter; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (a int) partitioned by (key0 int) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .dump(replPolicy, null, + Collections.singletonList("'hive.repl.dump.include.acid.tables' = 'true'")); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResult("acid_table"); + + tuple = primary.run("use " + primaryDbName) + .run("insert into acid_table partition(key0=11) values (11)") + .run("insert into acid_table partition(key0=10) values (10)") + .dump(replPolicy, tuple.lastReplicationId, + Collections.singletonList("'hive.repl.dump.include.acid.tables' = 'true'")); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from acid_table") + .verifyResult("11"); + + } + + @Test + public void testPartLevelReplictionMulitColPartition() throws Throwable { + String filter = " 'acid_table' where key0 > 10 and (key1 < 10 or key2 = '2019')"; + String replPolicy = primaryDbName + " PARTITIONS " + filter; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (a int) partitioned by (key0 int, key1 int, key2 string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .dump(replPolicy, null); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResult("acid_table"); + + tuple = primary.run("use " + primaryDbName) + .run("insert into acid_table partition(key0=11, key1=11, key2 = '2019') values (11)") + .run("insert into acid_table partition(key0=11, key1=11, key2 = '2018') values (10)") + .run("insert into acid_table partition(key0=10, key1=11, key2 = '2019') values (9)") + .dump(replPolicy, tuple.lastReplicationId, + Collections.singletonList("'hive.repl.dump.include.acid.tables' = 'true'")); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from acid_table") + .verifyResult("11"); + + tuple = primary.run("use " + primaryDbName) + .run("insert into acid_table partition(key0=11, key1=11, key2 = '2019') values (12)") + .run("insert into acid_table partition(key0=11, key1=1, key2 = '2018') values (13)") + .run("insert into acid_table partition(key0=11, key1=12, key2 = '2018') values (14)") + .dump(replPolicy, tuple.lastReplicationId, + Collections.singletonList("'hive.repl.dump.include.acid.tables' = 'true'")); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from acid_table") + .verifyResults(new String[] {"11", "12", "13"}); + + } + + @Test + public void testPartLevelReplictionMultiConditions() throws Throwable { + String replPolicy = primaryDbName + " PARTITIONS 'acid_table' where key0 > 10 and (key1 < 10 or key2 = '2019')" + + " or (key0 = 10 and key1 >= 12)"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (a int) partitioned by (key0 int, key1 int, key2 string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .dump(replPolicy, null, + Collections.singletonList("'hive.repl.dump.include.acid.tables' = 'true'")); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResult("acid_table"); + + tuple = primary.run("use " + primaryDbName) + .run("insert into acid_table partition(key0=11, key1=11, key2 = '2019') values (1)") + .run("insert into acid_table partition(key0=11, key1=11, key2 = '2018') values (2)") + .run("insert into acid_table partition(key0=10, key1=12, key2 = '2019') values (3)") + .run("insert into acid_table partition(key0=10, key1=12, key2 = '2018') values (4)") + .dump(replPolicy, tuple.lastReplicationId, + Collections.singletonList("'hive.repl.dump.include.acid.tables' = 'true'")); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from acid_table") + .verifyResults(new String[] {"1", "3", "4"}); + + } + + @Test + public void testPartLevelReplictionDateTypeField() throws Throwable { + String filter = " 'acid_table' where year(dt) >= 2019 and month(dt) >= 6"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table acid_table (a int) partitioned by (dt string) STORED AS " + + " ORC TBLPROPERTIES ('transactional'='true')") + .run("insert into acid_table partition(dt='1970-01-01') values (1)") + .run("insert into acid_table partition(dt='2019-06-01') values (2)") + .run("insert into acid_table partition(dt='2018-12-01') values (3)") + .run("insert into acid_table partition(dt='2019-01-01') values (4)") + .dump(primaryDbName + " PARTITIONS " + filter, null); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("select a from acid_table") + .verifyResults(new String[] {"2"}); + + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 6326bc34f2..80147d14b4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -255,11 +255,11 @@ WarehouseInstance runFailure(String command, int errorCode) throws Throwable { return this; } - Tuple dump(String dbName, String lastReplicationId, List withClauseOptions) - throws Throwable { + Tuple dump(String replPolicy, String lastReplicationId, List withClauseOptions) + throws Throwable { String dumpCommand = - "REPL DUMP " + dbName + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); - if (!withClauseOptions.isEmpty()) { + "REPL DUMP " + replPolicy + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); + if (withClauseOptions != null && !withClauseOptions.isEmpty()) { dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + ")"; } return dump(dumpCommand); @@ -271,7 +271,7 @@ Tuple dump(String replPolicy, String oldReplPolicy, String lastReplicationId, Li "REPL DUMP " + replPolicy + (oldReplPolicy == null ? "" : " REPLACE " + oldReplPolicy) + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); - if (!withClauseOptions.isEmpty()) { + if (withClauseOptions != null && !withClauseOptions.isEmpty()) { dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + ")"; } return dump(dumpCommand); @@ -285,13 +285,13 @@ Tuple dump(String dumpCommand) throws Throwable { return new Tuple(dumpLocation, lastDumpId); } - Tuple dump(String dbName, String lastReplicationId) throws Throwable { - return dump(dbName, lastReplicationId, Collections.emptyList()); + Tuple dump(String replPolicy, String lastReplicationId) throws Throwable { + return dump(replPolicy, lastReplicationId, Collections.emptyList()); } - WarehouseInstance dumpFailure(String dbName, String lastReplicationId) throws Throwable { + WarehouseInstance dumpFailure(String replPolicy, String lastReplicationId) throws Throwable { String dumpCommand = - "REPL DUMP " + dbName + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); + "REPL DUMP " + replPolicy + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); advanceDumpDir(); runFailure(dumpCommand); return this; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index ae9d040ef8..f666df23c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -53,7 +53,7 @@ public int execute(DriverContext driverContext) { LOG.debug("Exporting data to: {}", exportPaths.exportRootDir()); work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), - work.getReplicationSpec(), db, null, conf, work.getMmContext()); + work.getReplicationSpec(), db, null, conf, work.getMmContext(), null); if (!tableExport.write()) { throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 4cd60cc6ee..b42283b040 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; @@ -83,6 +84,12 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import java.util.Set; +import java.util.HashMap; +import java.util.Map; + import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; public class ReplDumpTask extends Task implements Serializable { @@ -242,6 +249,29 @@ private boolean isTableSatifiesConfig(Table table) { return true; } + private Map> getPartitionFilter(Hive hiveDb) throws Exception { + Map> partitionFilter = new HashMap<>(); + + for (String tableName : hiveDb.getMSC().getAllTables(work.dbNameOrPattern)) { + String filter = work.getPartitionFilter(tableName); + if (filter != null) { + //TODO : May cause issue if the numbber of partitions is too huge. + List partitions = + hiveDb.getMSC().listPartitionsByFilter(work.dbNameOrPattern, tableName, filter, (short)-1); + Set partitionsSet = StreamSupport.stream(partitions.spliterator(), true).map( + partition -> { + if (partition == null) { + return null; + } + return partition.getValues().toString(); + }).collect(Collectors.toSet()); + LOG.info("Added partition filter " + filter + " for table " + tableName); + partitionFilter.put(tableName, partitionsSet); + } + } + return partitionFilter; + } + private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception { Long lastReplId;// get list of events matching dbPattern & tblPattern // go through each event, and dump out each event to a event-level dump dir inside dumproot @@ -297,11 +327,14 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, work.maxEventLimit())); replLogger.startLog(); + + Map> partitionFilter = getPartitionFilter(hiveDb); + while (evIter.hasNext()) { NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, cmRoot, hiveDb); + dumpEvent(ev, evRoot, cmRoot, hiveDb, partitionFilter); } replLogger.endLog(lastReplId.toString()); @@ -381,7 +414,8 @@ private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean isIncremen return new Path(dumpRoot, dbName); } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db, + Map> partitionFilter) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, cmRoot, @@ -495,7 +529,6 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) { LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri()); Table table = null; - try { HiveWrapper.Tuple tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf); table = tableTuple != null ? tableTuple.object : null; @@ -580,9 +613,8 @@ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId)); } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); - new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); - + new TableExport(exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, + mmCtx, work.replScope).write(); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java index 7bae9ac66d..94b53ff34f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.Map; @Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, @@ -36,6 +37,7 @@ Long eventTo; private Integer maxEventLimit; static String testInjectDumpDir = null; + private Map partitionFilter; public static void injectNextDumpDirForTest(String dumpDir) { testInjectDumpDir = dumpDir; @@ -52,6 +54,7 @@ public ReplDumpWork(ReplScope replScope, ReplScope oldReplScope, this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; this.maxEventLimit = maxEventLimit; this.resultTempPath = resultTempPath; + this.partitionFilter = partitionFilter; } boolean isBootStrapDump() { @@ -91,4 +94,8 @@ void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception .debug("eventTo not specified, using current event id : {}", eventTo); } } + + public String getPartitionFilter(String tblName) { + return (tblName == null || partitionFilter == null) ? null : partitionFilter.get(tblName); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 1df50775c0..d0a236a44d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -26,7 +26,10 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.table.misc.AlterTableSetPropertiesDesc; @@ -34,7 +37,9 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; @@ -43,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration; @@ -57,6 +63,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.LinkedList; +import java.util.Collections; import java.io.Serializable; import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.TableMigrationOption.MANAGED; @@ -293,4 +301,25 @@ public static boolean includeAcidTableInDump(HiveConf conf) { public static boolean tableIncludedInReplScope(ReplScope replScope, String tableName) { return ((replScope == null) || replScope.tableIncludedInReplScope(tableName)); } + + public static boolean isPartSatisfiesFilter(Table tbl, List partValues, ExprNodeDesc partitionFilter, + HiveConf conf) throws HiveException, MetaException { + // Pass the table name as a linked list (single node) to partition pruner. If the table is pruned that means + // it's not passing the filter. Linked list is used as pruner expect a list + // so that it is easy to remove node from it. + List partColumnNames = new ArrayList<>(); + List partColumnTypeInfos = new ArrayList<>(); + String partName = Warehouse.makePartName(tbl.getPartCols(), partValues); + for (FieldSchema fs : tbl.getPartCols()) { + partColumnNames.add(fs.getName()); + partColumnTypeInfos.add(TypeInfoFactory.getPrimitiveTypeInfo(fs.getType())); + } + LinkedList linkedList = new LinkedList<>(Collections.singletonList(partName)); + PartitionPruner.prunePartitionNames(partColumnNames, partColumnTypeInfos, + (ExprNodeGenericFuncDesc) partitionFilter, + conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), + linkedList); + // If linked list is empty means the partition does not satisfies the filter. + return !linkedList.isEmpty(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java index 79c329d19b..a08a0b3fac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java @@ -18,6 +18,12 @@ package org.apache.hadoop.hive.ql.metadata; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -45,6 +51,7 @@ * IllegalStateException. */ public class PartitionIterable implements Iterable { + private static final Logger LOG = LoggerFactory.getLogger(PartitionIterable.class); @Override public Iterator iterator() { @@ -56,10 +63,26 @@ private Iterator partitionNamesIter = null; private Iterator batchIter = null; + private Iterator partitionFilterIter; + private void initialize(){ if(!initialized){ if (currType == Type.LIST_PROVIDED){ ptnsIterator = ptnsProvided.iterator(); + } else if (currType == Type.FILTER_PROVIDED) { + try { + List result = new ArrayList<>(); + boolean hasUnknown = db.getPartitionsByExpr(table, + (ExprNodeGenericFuncDesc)partitionFilter, db.getConf(), result); + if (hasUnknown) { + throw new SemanticException( + "Unexpected unknown partitions for " + partitionFilter.getExprString()); + } + partitionFilterIter = result.iterator(); + } catch (Exception e) { + LOG.error("Failed to extract partitions for " + table.getDbName() + "." + table.getTableName(), e); + throw new RuntimeException(e.getMessage()); + } } else { partitionNamesIter = partitionNames.iterator(); } @@ -72,6 +95,8 @@ public boolean hasNext() { initialize(); if (currType == Type.LIST_PROVIDED){ return ptnsIterator.hasNext(); + } else if (currType == Type.FILTER_PROVIDED) { + return partitionFilterIter == null ? false : partitionFilterIter.hasNext(); } else { return ((batchIter != null) && batchIter.hasNext()) || partitionNamesIter.hasNext(); } @@ -84,6 +109,10 @@ public Partition next() { return ptnsIterator.next(); } + if (currType == Type.FILTER_PROVIDED) { + return partitionFilterIter.next(); + } + if ((batchIter == null) || !batchIter.hasNext()){ getNextBatch(); } @@ -115,7 +144,8 @@ public void remove() { enum Type { LIST_PROVIDED, // Where a List partitionNames = null; private int batchSize; private boolean getColStats = false; + ExprNodeDesc partitionFilter; + + public PartitionIterable(Hive db, Table tbl, ExprNodeDesc filter) { + this.currType = Type.FILTER_PROVIDED; + this.table = tbl; + this.partitionFilter = filter; + this.db = db; + } /** * Dummy constructor, which simply acts as an iterator on an already-present diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 4a366a9360..47dc85f603 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -108,7 +108,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // Note: this tableExport is actually never used other than for auth, and another one is // created when the task is executed. So, we don't care about the correct MM state here. TableExport.AuthEntities authEntities = new TableExport( - exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities(); + exportPaths, ts, replicationSpec, db, null, conf, null, null).getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); String exportRootDirName = tmpPath; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 73f1804e8c..d3aef3b7f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -398,6 +398,8 @@ TOK_REPL_CONFIG; TOK_REPL_CONFIG_LIST; TOK_REPL_TABLES; TOK_REPL_TABLES_LIST; +TOK_REPL_COND; +TOK_REPL_COND_LIST; TOK_TO; TOK_ONLY; TOK_SUMMARY; @@ -911,7 +913,22 @@ replDbPolicy @init { pushMsg("Repl dump DB replication policy", state); } @after { popMsg(state); } : - (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? -> $dbName $tablePolicy? + (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? (KW_PARTITIONS partFilter=partitionFilterList)? + -> $dbName $tablePolicy? $partFilter? + ; + +partitionFilterList +@init { pushMsg("Table where clause List for partition filter", state); } +@after { popMsg(state); } + : + partitionFilter (COMMA partitionFilter)* -> ^(TOK_REPL_COND_LIST partitionFilter+) + ; + +partitionFilter +@init { pushMsg("partition filter for each table name pattern", state); } +@after { popMsg(state); } + : + tblName=StringLiteral wh=whereClause -> ^(TOK_REPL_COND $tblName $wh) ; replLoadStatement diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 39789ca22f..25cf78a226 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -65,6 +65,7 @@ import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_STATUS; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_TABLES; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_COND_LIST; public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Replication Scope @@ -180,6 +181,22 @@ private void setOldReplPolicy(Tree oldReplPolicyTree) throws HiveException { setReplDumpTablesList(oldPolicyTablesListNode, oldReplScope); } + private void extractPartitionFilter(Tree condNode, ReplScope replScope) throws Exception { + for (int i = 0; i < condNode.getChildCount(); i++) { + Tree node = condNode.getChild(i); + // Table pattern node + String tablePattern = unescapeSQLString(node.getChild(0).getText()); + if (tablePattern == null || tablePattern.isEmpty()) { + throw new SemanticException(ErrorMsg.REPL_INVALID_DB_OR_TABLE_PATTERN); + } + + // Filter Node + Tree filter = node.getChild(1).getChild(0); + LOG.info("Adding filter " + filter.toStringTree() + " to repl scope for pattern list " + tablePattern); + replScope.setPartFilter(tablePattern, filter); + } + } + private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); boolean isMetaDataOnly = false; @@ -236,6 +253,15 @@ private void initReplDump(ASTNode ast) throws HiveException { } break; } + case TOK_REPL_COND_LIST: { + try { + extractPartitionFilter(currNode, replScope); + } catch (Exception e) { + LOG.error("Failed to extract partition filter.", e); + throw new HiveException(e.getMessage()); + } + break; + } default: { throw new SemanticException("Unrecognized token " + currNode.getType() + " in REPL DUMP statement."); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8ff00fbb1f..4fd737371e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -11376,6 +11376,20 @@ protected String getAliasId(String alias, QB qb) { return (qb.getId() == null ? alias : qb.getId() + ":" + alias).toLowerCase(); } + public static RowResolver getRowResolverFromTable(Table tbl) { + RowResolver rwsch = new RowResolver(); + String alias = tbl.getTableName(); + for (FieldSchema cols : tbl.getAllCols()) { + rwsch.put(alias, cols.getName(), new ColumnInfo(cols.getName(), + TypeInfoFactory.getPrimitiveTypeInfo(cols.getType()), alias, false)); + } + for (FieldSchema partCol : tbl.getPartCols()) { + rwsch.put(alias, partCol.getName(), new ColumnInfo(partCol.getName(), + TypeInfoFactory.getPrimitiveTypeInfo(partCol.getType()), alias, true)); + } + return rwsch; + } + @SuppressWarnings("nls") private Operator genTablePlan(String alias, QB qb) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 56850417dd..9e091f044b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -30,12 +31,17 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.TypeCheckCtx; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,9 +69,10 @@ private final HiveConf conf; private final Paths paths; private final MmContext mmCtx; + private ExprNodeDesc partitionFilter = null; public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, - String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { + String distCpDoAsUser, HiveConf conf, MmContext mmCtx, ReplScope replScope) throws SemanticException { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) @@ -83,6 +90,17 @@ public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replication this.conf = conf; this.paths = paths; this.mmCtx = mmCtx; + if (replScope != null) { + ASTNode filterNode = (ASTNode) replScope.getPartFilter(tableSpec.tableHandle.getTableName()); + if (filterNode == null) { + logger.info("Partition filter is not set for table " + tableSpec.tableHandle.getTableName()); + } else { + partitionFilter = TypeCheckProcFactory.genExprNode(filterNode, + new TypeCheckCtx(SemanticAnalyzer.getRowResolverFromTable(tableSpec.tableHandle))).get(filterNode); + logger.info("Partition filter " + filterNode.toStringTree() + " is set for table " + + tableSpec.tableHandle.getTableName()); + } + } } public boolean write() throws SemanticException { @@ -108,6 +126,9 @@ private PartitionIterable getPartitions() throws SemanticException { if (replicationSpec.isMetadataOnly()) { return null; } else { + if (partitionFilter != null) { + return new PartitionIterable(db, tableSpec.tableHandle, partitionFilter); + } return new PartitionIterable(db, tableSpec.tableHandle, null, conf.getIntVar( HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX), true); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 42e74b37d9..295b90331c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -84,7 +84,11 @@ public void handle(Context withinContext) throws Exception { return null; } try { - return new Partition(qlMdTable, input); + Partition partition = new Partition(qlMdTable, input); + if (withinContext.isPartitionIncludedInDump(qlMdTable, withinContext.hiveConf, partition.getValues())) { + return partition; + } + return null; } catch (HiveException e) { throw new IllegalArgumentException(e); } @@ -105,6 +109,9 @@ public void handle(Context withinContext) throws Exception { // list would be empty. So, it is enough to check hasNext outside the loop. if (partitionFilesIter.hasNext()) { for (Partition qlPtn : qlPtns) { + if (qlPtn == null) { + continue; + } Iterable files = partitionFilesIter.next().getFiles(); if (files != null) { // encoded filename/checksum of files, write into _files diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index 50e0cd5f1f..123955f974 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -34,6 +34,7 @@ class AlterPartitionHandler extends AbstractEventHandler { private final org.apache.hadoop.hive.metastore.api.Partition after; + private final org.apache.hadoop.hive.metastore.api.Partition before; private final org.apache.hadoop.hive.metastore.api.Table tableObject; private final boolean isTruncateOp; private final Scenario scenario; @@ -42,7 +43,7 @@ super(event); AlterPartitionMessage apm = eventMessage; tableObject = apm.getTableObj(); - org.apache.hadoop.hive.metastore.api.Partition before = apm.getPtnObjBefore(); + before = apm.getPtnObjBefore(); after = apm.getPtnObjAfter(); isTruncateOp = apm.getIsTruncateOp(); scenario = scenarioType(before, after); @@ -105,6 +106,10 @@ public void handle(Context withinContext) throws Exception { return; } + if (!withinContext.isPartitionIncludedInDump(qlMdTable, withinContext.hiveConf, before.getValues())) { + return; + } + if (Scenario.ALTER == scenario) { withinContext.replicationSpec.setIsMetadataOnly(true); List partitions = new ArrayList<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 7d7dc26a25..55c47a9664 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -161,8 +161,6 @@ public void handle(Context withinContext) throws Exception { int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0); if (numEntry != 0) { eventMessage.addWriteEventInfo(writeEventInfoList); - payload = jsonMessageEncoder.getSerializer().serialize(eventMessage); - LOG.debug("payload for commit txn event : " + eventMessageAsJSON); } org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null; @@ -191,8 +189,18 @@ public void handle(Context withinContext) throws Exception { } if (qlMdTable.isPartitioned() && (null != eventMessage.getPartitionObj(idx))) { - qlPtns.add(new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable, - eventMessage.getPartitionObj(idx))); + org.apache.hadoop.hive.ql.metadata.Partition partition = + new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable, + eventMessage.getPartitionObj(idx)); + if (withinContext.isPartitionIncludedInDump(qlMdTable, withinContext.hiveConf, partition.getValues())) { + qlPtns.add(partition); + } else { + // If partition is filtered out, remove its entry from eventMessage. Update the iterators accordingly. + eventMessage.removeWriteEventInfo(idx); + idx--; + numEntry--; + continue; + } } filesTobeAdded.add(Lists.newArrayList( @@ -203,6 +211,13 @@ public void handle(Context withinContext) throws Exception { if (qlMdTablePrev != null) { createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded); } + + // Build the payload after eventMessage is updated properly using filter. If no write event info is there, + // no need to re-create the payload. + if (writeEventInfoList != null && writeEventInfoList.size() != 0) { + payload = jsonMessageEncoder.getSerializer().serialize(eventMessage); + LOG.debug("payload for commit txn event : " + payload); + } } DumpMetaData dmd = withinContext.createDmd(this); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java index 272f5ce90d..f85cfeac0d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java @@ -47,6 +47,8 @@ public void handle(Context withinContext) throws Exception { return; } + //TODO : Even if partition filter is specified, we dump the drop partition message for all partitions. + // The filtering is handled during load, where the event will be skipped as partition will not be present. DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); dmd.write(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index 7d00f89a5b..5400e9edc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -20,11 +20,22 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.TypeCheckCtx; +import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; + import java.util.Set; +import java.util.List; public interface EventHandler { void handle(Context withinContext) throws Exception; @@ -95,5 +106,22 @@ boolean removeFromListOfTablesForBootstrap(String tableName) { assert tableName != null; return tablesForBootstrap.remove(tableName.toLowerCase()); } + + public boolean isPartitionIncludedInDump(Table tbl, HiveConf conf, List partValues) { + try { + ASTNode filterNode = (ASTNode)replScope.getPartFilter(tbl.getTableName()); + if (filterNode == null) { + // Table has no filter, means all partitions are part of dump. + return true; + } + ExprNodeDesc partitionFilter = TypeCheckProcFactory.genExprNode(filterNode, + new TypeCheckCtx(SemanticAnalyzer.getRowResolverFromTable(tbl))).get(filterNode); + return ReplUtils.isPartSatisfiesFilter(tbl, partValues, partitionFilter, conf); + } catch (HiveException e) { + throw new RuntimeException(e.getMessage()); + } catch (MetaException e) { + throw new RuntimeException(e.getMessage()); + } + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 5a18d573cf..f3b94387b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -69,6 +69,9 @@ public void handle(Context withinContext) throws Exception { List qlPtns = null; if (qlMdTable.isPartitioned() && (null != eventMessage.getPtnObj())) { qlPtns = Collections.singletonList(partitionObject(qlMdTable, eventMessage)); + if (!withinContext.isPartitionIncludedInDump(qlMdTable, withinContext.hiveConf, qlPtns.get(0).getValues())) { + return; + } } Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java index 432dd4452f..9e2047fd5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java @@ -52,10 +52,15 @@ public void handle(Context withinContext) throws Exception { return; } - if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), true, + Table tbl = new Table(tableObj); + if (!Utils.shouldReplicate(withinContext.replicationSpec, tbl, true, withinContext.getTablesForBootstrap(), withinContext.oldReplScope, withinContext.hiveConf)) { return; } + + if (!withinContext.isPartitionIncludedInDump(tbl, withinContext.hiveConf, eventMessage.getPartVals())) { + return; + } DumpMetaData dmd = withinContext.createDmd(this); dmd.setPayload(eventMessageAsJSON); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 898b839355..f8ac9d72ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -104,8 +104,10 @@ private void writePartitions(JsonWriter writer, ReplicationSpec additionalProper writer.jsonGenerator.writeStartArray(); if (partitions != null) { for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) { - new PartitionSerializer(partition.getTPartition()) - .writeTo(writer, additionalPropertiesProvider); + if (partition != null) { + new PartitionSerializer(partition.getTPartition()) + .writeTo(writer, additionalPropertiesProvider); + } } } writer.jsonGenerator.writeEndArray(); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java index 8b26f1ce40..4f12042028 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.common.repl; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import java.util.regex.Pattern; /** @@ -33,6 +35,8 @@ private String excludedTableNames; private Pattern includedTableNamePattern; private Pattern excludedTableNamePattern; + private Map partFilter = new HashMap<>(); + private Map partFilterTableMap = new HashMap<>(); public ReplScope() { } @@ -116,4 +120,29 @@ private boolean inTableExcludedList(final String tableName) { } return excludedTableNamePattern.matcher(tableName).matches(); } + + public Object getPartFilter(String tableName) { + // This will make sure that for each table partFilter is scanned only once. + if (partFilterTableMap.containsKey(tableName.toLowerCase())) { + return partFilterTableMap.get(tableName); + } + Object filter = null; + // The number of patterns are not expected to be huge, so iterating the whole list once per table should not + // be an issue. + for (Pattern pattern : partFilter.keySet()) { + if (pattern.matcher(tableName).matches()) { + if (filter != null) { + throw new RuntimeException("Table " + tableName + " is matching multiple partition filter patterns."); + } + filter = partFilter.get(pattern); + } + } + // The filter can be null for cases where the table does not match any of the pattern. + this.partFilterTableMap.put(tableName, filter); + return filter; + } + + public void setPartFilter(String patternString, Object filter) { + partFilter.put(Pattern.compile(patternString, Pattern.CASE_INSENSITIVE), filter); + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java index 9733039f06..3ad70517f2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java @@ -56,4 +56,6 @@ protected CommitTxnMessage() { public abstract List getFilesList(); public abstract void addWriteEventInfo(List writeEventInfoList); + + public abstract void removeWriteEventInfo(int idx); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java index 482fc8e26b..fec8d50677 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java @@ -173,6 +173,17 @@ public void addWriteEventInfo(List writeEventInfoList) { } } + @Override + public void removeWriteEventInfo(int idx) { + this.databases.remove(idx); + this.tables.remove(idx); + this.writeIds.remove(idx); + this.partitions.remove(idx); + this.tableObjs.remove(idx); + this.partitionObjs.remove(idx); + this.files.remove(idx); + } + @Override public String toString() { try { diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties index 5a9b3ec9fe..e4dcef86a5 100644 --- a/testutils/ptest2/conf/deployed/master-mr2.properties +++ b/testutils/ptest2/conf/deployed/master-mr2.properties @@ -78,7 +78,7 @@ ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez Test TestStatsReplicationScenariosNoAutogather TestStatsReplicationScenarios \ TestReplicationScenariosAcidTables TestReplAcidTablesBootstrapWithJsonMessage \ TestReplicationScenariosAcidTablesBootstrap TestReplAcidTablesWithJsonMessage \ - TestTableLevelReplicationScenarios \ + TestTableLevelReplicationScenarios TestPartitionLevelReplication \ TestReplicationScenariosExternalTables TestReplAcrossInstancesWithJsonMessageFormat TestReplTableMigrationWithJsonFormat \ TestReplicationWithTableMigration TestReplicationWithTableMigrationEx TestReplicationOnHDFSEncryptedZonesTestReplAcidTablesWithJsonMessage \ TestReplicationScenariosAcrossInstances TestReplicationOfHiveStreaming TestReplScenariosWithStrictManaged TestReplChangeManager