diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 2a48527a31..ae72d1b077 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -26,6 +27,7 @@ import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.apache.hadoop.hive.shims.Utils; +import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -42,6 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; @@ -58,19 +61,21 @@ public TestRule replV1BackwardCompat; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); - private static WarehouseInstance primary, replica; - private static MiniDFSCluster miniDFSCluster; + private String primaryDbName, replicatedDbName; @BeforeClass public static void classLevelSetup() throws Exception { Configuration conf = new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - miniDFSCluster = + MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - primary = new WarehouseInstance(LOG, miniDFSCluster); - replica = new WarehouseInstance(LOG, miniDFSCluster); + HashMap overridesForHiveConf = new HashMap() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + }}; + primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); } @AfterClass @@ -79,8 +84,6 @@ public static void classLevelTearDown() throws IOException { replica.close(); } - private String primaryDbName, replicatedDbName; - @Before public void setup() throws Throwable { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); @@ -89,6 +92,12 @@ public void setup() throws Throwable { primary.run("create database " + primaryDbName); } + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + @Test public void testCreateFunctionIncrementalReplication() throws Throwable { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); @@ -97,7 +106,7 @@ public void testCreateFunctionIncrementalReplication() throws Throwable { .verifyResult(bootStrapDump.lastReplicationId); primary.run("CREATE FUNCTION " + primaryDbName - + ".testFunction as 'hivemall.tools.string.StopwordUDF' " + + ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' " + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'"); WarehouseInstance.Tuple incrementalDump = @@ -106,41 +115,41 @@ public void testCreateFunctionIncrementalReplication() throws Throwable { .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId) .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verifyResult(replicatedDbName + ".testFunction"); + .verifyResult(replicatedDbName + ".testFunctionOne"); // Test the idempotent behavior of CREATE FUNCTION replica.load(replicatedDbName, incrementalDump.dumpLocation) - .run("REPL STATUS " + replicatedDbName) + .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId) - .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") - .verifyResult(replicatedDbName + ".testFunction"); + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") + .verifyResult(replicatedDbName + ".testFunctionOne"); } @Test public void testDropFunctionIncrementalReplication() throws Throwable { primary.run("CREATE FUNCTION " + primaryDbName - + ".testFunction as 'hivemall.tools.string.StopwordUDF' " + + ".testFunctionAnother as 'hivemall.tools.string.StopwordUDF' " + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'"); WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); - primary.run("Drop FUNCTION " + primaryDbName + ".testFunction "); + primary.run("Drop FUNCTION " + primaryDbName + ".testFunctionAnother "); WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId) - .run("SHOW FUNCTIONS LIKE '*testfunction*'") + .run("SHOW FUNCTIONS LIKE '*testfunctionanother*'") .verifyResult(null); // Test the idempotent behavior of DROP FUNCTION replica.load(replicatedDbName, incrementalDump.dumpLocation) - .run("REPL STATUS " + replicatedDbName) + .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId) - .run("SHOW FUNCTIONS LIKE '*testfunction*'") + .run("SHOW FUNCTIONS LIKE '*testfunctionanother*'") .verifyResult(null); } @@ -254,7 +263,7 @@ public void testMultipleStagesOfReplicationLoadTask() throws Throwable { } @Test - public void parallelExecutionOfReplicationBootStrapLoad() throws Throwable { + public void testParallelExecutionOfReplicationBootStrapLoad() throws Throwable { WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create table t1 (id int)") @@ -280,6 +289,7 @@ public void parallelExecutionOfReplicationBootStrapLoad() throws Throwable { .run("select country from t2") .verifyResults(Arrays.asList("india", "australia", "russia", "uk", "us", "france", "japan", "china")); + replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, false); } @Test @@ -376,4 +386,63 @@ public void testIncrementalMetadataReplication() throws Throwable { "custom.value\t " }); } + + @Test + public void testBootStrapDumpOfWarehouse() throws Throwable { + String randomOne = RandomStringUtils.random(10, true, false); + String randomTwo = RandomStringUtils.random(10, true, false); + String dbOne = primaryDbName + randomOne; + String dbTwo = primaryDbName + randomTwo; + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (i int, j int)") + .run("create database " + dbOne) + .run("use " + dbOne) + .run("create table t1 (i int, j int) partitioned by (load_date date) " + + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") + .run("create database " + dbTwo) + .run("use " + dbTwo) + .run("create table t1 (i int, j int)") + .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", + "'hive.repl.dump.include.acid.tables'='true'")); + + /* + Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM + we are not able to create multiple embedded derby instances for two different MetaStore instances. + */ + + primary.run("drop database " + primaryDbName + " cascade"); + primary.run("drop database " + dbOne + " cascade"); + primary.run("drop database " + dbTwo + " cascade"); + + /* + End of additional steps + */ + + replica.run("show databases") + .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo }) + .load("", tuple.dumpLocation) + .run("show databases") + .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo }) + .run("use " + primaryDbName) + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("use " + dbOne) + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("use " + dbTwo) + .run("show tables") + .verifyResults(new String[] { "t1" }); + /* + Start of cleanup + */ + + replica.run("drop database " + primaryDbName + " cascade"); + replica.run("drop database " + dbOne + " cascade"); + replica.run("drop database " + dbTwo + " cascade"); + + /* + End of cleanup + */ + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 0918d33a21..86bcb8934a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -48,7 +48,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -87,25 +86,13 @@ initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf); } - WarehouseInstance(Logger logger, MiniDFSCluster cluster, String keyNameForEncryptedZone) - throws Exception { - this(logger, cluster, new HashMap() {{ - put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); - }}, keyNameForEncryptedZone); - } - WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map overridesForHiveConf) throws Exception { this(logger, cluster, overridesForHiveConf, null); } - WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { - this(logger, cluster, (String) null); - } - private void initialize(String cmRoot, String warehouseRoot, - Map overridesForHiveConf) - throws Exception { + Map overridesForHiveConf) throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); for (Map.Entry entry : overridesForHiveConf.entrySet()) { hiveConf.set(entry.getKey(), entry.getValue()); @@ -129,7 +116,6 @@ private void initialize(String cmRoot, String warehouseRoot, hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hiveConf.setVar(HiveConf.ConfVars.REPLCMDIR, cmRoot); hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot); - System.setProperty("datanucleus.mapping.Schema", "APP"); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:memory:${test.tmp.dir}/APP;create=true"); hiveConf.setVar(HiveConf.ConfVars.REPLDIR, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index 432e394381..91ec93e025 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -79,8 +79,7 @@ void updateDbEventState(DatabaseEvent.State state) { } DatabaseEvent databaseEvent(HiveConf hiveConf) { - DatabaseEvent databaseEvent = state.toEvent(hiveConf); - return databaseEvent; + return state.toEvent(hiveConf); } boolean hasDbState() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index b9f2d0a837..cfd16405fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -65,7 +66,8 @@ public Path metadataPath() { public ImportTableDesc tableDesc(String dbName) throws SemanticException { try { Table table = new Table(metadata.getTable()); - ImportTableDesc tableDesc = new ImportTableDesc(dbName, table); + ImportTableDesc tableDesc = + new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table); tableDesc.setReplicationSpec(metadata.getReplicationSpec()); return tableDesc; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 87821fddfe..89837be100 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -247,9 +247,8 @@ public static void createDbExportDump(FileSystem fs, Path metadataPath, Database // Remove all the entries from the parameters which are added for bootstrap dump progress Map parameters = dbObj.getParameters(); - Map tmpParameters = new HashMap<>(); if (parameters != null) { - tmpParameters.putAll(parameters); + Map tmpParameters = new HashMap<>(parameters); tmpParameters.entrySet() .removeIf(e -> e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)); dbObj.setParameters(tmpParameters); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index c1c1fd3263..c69ecc9405 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -22,9 +22,11 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.AddPrimaryKeyHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.AddUniqueConstraintHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.AlterDatabaseHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateDatabaseHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DefaultHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropConstraintHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DropDatabaseHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropPartitionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropTableHandler; @@ -169,6 +171,18 @@ public MessageHandler handler() { public MessageHandler handler() { return new DefaultHandler(); } + }, + EVENT_CREATE_DATABASE("EVENT_CREATE_DATABASE") { + @Override + public MessageHandler handler() { + return new CreateDatabaseHandler(); + } + }, + EVENT_DROP_DATABASE("EVENT_DROP_DATABASE") { + @Override + public MessageHandler handler() { + return new DropDatabaseHandler(); + } }; String type = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java new file mode 100644 index 0000000000..21eb74b52e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateDatabaseHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; + +class CreateDatabaseHandler extends AbstractEventHandler { + CreateDatabaseHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} CREATE_DATABASE message : {}", fromEventId(), event.getMessage()); + CreateDatabaseMessage createDatabaseMsg = + deserializer.getCreateDatabaseMessage(event.getMessage()); + Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); + FileSystem fileSystem = metaDataPath.getFileSystem(withinContext.hiveConf); + EximUtil.createDbExportDump(fileSystem, metaDataPath, createDatabaseMsg.getDatabaseObject(), + withinContext.replicationSpec); + withinContext.createDmd(this).write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_CREATE_DATABASE; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java index 6b709a6d52..979e9a124f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropConstraintHandler.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -public class DropConstraintHandler extends AbstractEventHandler { +class DropConstraintHandler extends AbstractEventHandler { DropConstraintHandler(NotificationEvent event) { super(event); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java new file mode 100644 index 0000000000..4eae7783d9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropDatabaseHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DropDatabaseHandler extends AbstractEventHandler { + DropDatabaseHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DROP_DATABASE message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_DROP_DATABASE; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index dc197414e7..9955246ff8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -48,6 +48,8 @@ private EventHandlerFactory() { register(MessageFactory.ADD_UNIQUECONSTRAINT_EVENT, AddUniqueConstraintHandler.class); register(MessageFactory.ADD_NOTNULLCONSTRAINT_EVENT, AddNotNullConstraintHandler.class); register(MessageFactory.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class); + register(MessageFactory.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class); + register(MessageFactory.DROP_DATABASE_EVENT, DropDatabaseHandler.class); } static void register(String event, Class handlerClazz) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java new file mode 100644 index 0000000000..68b94e3473 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateDatabaseHandler.java @@ -0,0 +1,73 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.PrincipalDesc; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +public class CreateDatabaseHandler extends AbstractMessageHandler { + + @Override + public List> handle(Context context) + throws SemanticException { + MetaData metaData; + try { + FileSystem fs = FileSystem.get(new Path(context.location).toUri(), context.hiveConf); + + metaData = EximUtil.readMetaData(fs, new Path(context.location, EximUtil.METADATA_NAME)); + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + Database db = metaData.getDatabase(); + String destinationDBName = + context.dbName == null ? db.getName() : context.dbName; + + CreateDatabaseDesc createDatabaseDesc = + new CreateDatabaseDesc(destinationDBName, db.getDescription(), db.getLocationUri(), true); + createDatabaseDesc.setDatabaseProperties(db.getParameters()); + Task createDBTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), createDatabaseDesc), context.hiveConf); + if (StringUtils.isNotEmpty(db.getOwnerName())) { + AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(db.getName(), + new PrincipalDesc(db.getOwnerName(), db.getOwnerType()), context.eventOnlyReplicationSpec()); + Task alterDbTask = TaskFactory + .get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), context.hiveConf); + createDBTask.addDependentTask(alterDbTask); + } + return Collections.singletonList(createDBTask); + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java index 459fac500d..d9d185b4a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropConstraintHandler.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -29,6 +24,10 @@ import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + public class DropConstraintHandler extends AbstractMessageHandler { @Override public List> handle(Context context) @@ -41,8 +40,6 @@ AlterTableDesc dropConstraintsDesc = new AlterTableDesc(actualDbName + "." + actualTblName, constraintName, context.eventOnlyReplicationSpec()); Task dropConstraintsTask = TaskFactory.get(new DDLWork(readEntitySet, writeEntitySet, dropConstraintsDesc), context.hiveConf); - List> tasks = new ArrayList>(); - tasks.add(dropConstraintsTask); context.log.debug("Added drop constrain task : {}:{}", dropConstraintsTask.getId(), actualTblName); updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, null); return Collections.singletonList(dropConstraintsTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java new file mode 100644 index 0000000000..5abbece9ef --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropDatabaseHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropDatabaseDesc; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +public class DropDatabaseHandler extends AbstractMessageHandler { + @Override + public List> handle(Context context) + throws SemanticException { + DropDatabaseMessage msg = + deserializer.getDropDatabaseMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + DropDatabaseDesc desc = new DropDatabaseDesc(actualDbName, true); + Task dropDBTask = + TaskFactory + .get(new DDLWork(new HashSet<>(), new HashSet<>(), desc), context.hiveConf); + context.log.info( + "Added drop database task : {}:{}", dropDBTask.getId(), desc.getDatabaseName()); + updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, null, null); + return Collections.singletonList(dropDBTask); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java index 8de4844b2e..96e3fca899 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java @@ -90,6 +90,16 @@ private static void assertDatabase(final int expectedNumberOfChildren, ASTNode r public static class ReplDump { + @Test + public void parseDbPattern() throws ParseException { + ASTNode root = parse("repl dump `*`"); + assertEquals("TOK_REPL_DUMP", root.getText()); + assertEquals(1, root.getChildCount()); + ASTNode child = (ASTNode) root.getChild(0); + assertEquals("`*`", child.getText()); + assertEquals(0, child.getChildCount()); + } + @Test public void parseDb() throws ParseException { ASTNode root = parse("repl dump testDb"); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java index 328c1180e1..3d64c7341a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,10 +19,13 @@ package org.apache.hadoop.hive.metastore.messaging; +import org.apache.hadoop.hive.metastore.api.Database; + public abstract class CreateDatabaseMessage extends EventMessage { protected CreateDatabaseMessage() { super(EventType.CREATE_DATABASE); } + public abstract Database getDatabaseObject() throws Exception; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java index f442e99923..b7e93aed51 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCreateDatabaseMessage.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.metastore.messaging.json; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.thrift.TException; import org.codehaus.jackson.annotate.JsonProperty; /** @@ -28,7 +30,8 @@ public class JSONCreateDatabaseMessage extends CreateDatabaseMessage { @JsonProperty - String server, servicePrincipal, db; + String server, servicePrincipal, db, dbJson; + ; @JsonProperty Long timestamp; @@ -38,14 +41,24 @@ */ public JSONCreateDatabaseMessage() {} - public JSONCreateDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) { + public JSONCreateDatabaseMessage(String server, String servicePrincipal, Database db, + Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; - this.db = db; + this.db = db.getName(); this.timestamp = timestamp; + try { + this.dbJson = JSONMessageFactory.createDatabaseObjJson(db); + } catch (TException ex) { + throw new IllegalArgumentException("Could not serialize Function object", ex); + } checkValid(); } + public Database getDatabaseObject() throws Exception { + return (Database) JSONMessageFactory.getTObj(dbJson, Database.class); + } + @Override public String getDB() { return db; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index a9fe19649a..7f46d071fe 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -99,7 +99,7 @@ public String getMessageFormat() { @Override public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { - return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now()); + return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now()); } @Override