diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index f86f2ac2c0..ee78979ac7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -27,6 +28,9 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; @@ -35,10 +39,14 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.*; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Assert; @@ -1184,6 +1192,78 @@ public void testExternalTableBaseDirMandatory() throws Throwable { .verifyReplTargetProperty(replicatedDbName); } + @Test + public void differentCatalogIncrementalReplication() throws Throwable { + //Create the catalog + Catalog catalog = new Catalog(); + catalog.setName("spark"); + Warehouse wh = new Warehouse(conf); + catalog.setLocationUri(wh.getWhRootExternal().toString() + File.separator + catalog); + catalog.setDescription("Non-hive catalog"); + Hive.get(primary.hiveConf).getMSC().createCatalog(catalog); + + //Create database and table in spark catalog + String sparkDbName = "src_spark"; + Database sparkdb = new Database(); + sparkdb.setCatalogName("spark"); + sparkdb.setName(sparkDbName); + Hive.get(primary.hiveConf).getMSC().createDatabase(sparkdb); + + SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), + new HashMap()); + ArrayList cols = new ArrayList(1); + cols.add(new FieldSchema("place", serdeConstants.STRING_TYPE_NAME, "")); + StorageDescriptor sd + = new StorageDescriptor(cols, null, + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat", + false, 0, serdeInfo, null, null, null); + Map tableParameters = new HashMap(); + + Table sparkTable = new Table("mgt1", sparkDbName, "", 0, 0, 0, + sd, null, tableParameters, "", "", ""); + sparkTable.setCatName("spark"); + Hive.get(primary.hiveConf).getMSC().createTable(sparkTable); + + //create same db in hive catalog + Map params = new HashMap<>(); + params.put(SOURCE_OF_REPLICATION, "1"); + Database hiveDb = new Database(); + hiveDb.setCatalogName("hive"); + hiveDb.setName(sparkDbName); + hiveDb.setParameters(params); + Hive.get(primary.hiveConf).getMSC().createDatabase(hiveDb); + + primary.dump(sparkDbName); + //spark tables are not replicated in bootstrap + replica.load(replicatedDbName, sparkDbName) + .run("use " + replicatedDbName) + .run("show tables like mgdt1") + .verifyResult(null); + + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t1/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + //Create another table in spark + sparkTable = new Table("mgt2", sparkDbName, "", 0, 0, 0, + sd, null, tableParameters, "", "", ""); + sparkTable.setCatName("spark"); + Hive.get(primary.hiveConf).getMSC().createTable(sparkTable); + + //Incremental load shouldn't copy any events from spark catalog + primary.dump(sparkDbName); + replica.load(replicatedDbName, sparkDbName) + .run("use " + replicatedDbName) + .run("show tables like mgdt1") + .verifyResult(null) + .run("show tables like 'mgt2'") + .verifyResult(null); + + primary.run("drop database if exists " + sparkDbName + " cascade"); + } + private void assertExternalFileInfo(List expected, String dumplocation, boolean isIncremental) throws IOException { assertExternalFileInfo(expected, dumplocation, null, isIncremental); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f261889c86..080a2e0d7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -41,8 +41,10 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; +import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; @@ -514,6 +516,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId); IMetaStoreClient.NotificationFilter evFilter = new AndFilter( new ReplEventFilter(work.replScope), + new CatalogFilter(MetaStoreUtils.getDefaultCatalog(conf)), new EventBoundaryFilter(work.eventFrom, work.eventTo)); EventUtils.MSClientNotificationFetcher evFetcher = new EventUtils.MSClientNotificationFetcher(hiveDb); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java new file mode 100644 index 0000000000..bb2472f661 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/event/filters/CatalogFilter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +/** + * Utility function that constructs a notification filter to match a given catalog name. + */ +public class CatalogFilter extends BasicFilter { + private final String catalogName; + + public CatalogFilter(final String catalogName) { + this.catalogName = catalogName; + } + + @Override + boolean shouldAccept(final NotificationEvent event) { + if (catalogName == null || catalogName.equalsIgnoreCase(event.getCatName())) { + return true; + } + return false; + } +}