diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index e4cc799aca..1ed209ace6 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -59,7 +59,7 @@ private String dropCommand(String dbName) { } @Test - public void testIncrementalFunctionReplication() throws Throwable { + public void testCreateFunctionIncrementalReplication() throws Throwable { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) @@ -78,6 +78,26 @@ public void testIncrementalFunctionReplication() throws Throwable { } @Test + public void testDropFunctionIncrementalReplication() throws Throwable { + primary.run("CREATE FUNCTION " + primaryDbName + + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " + + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verify(bootStrapDump.lastReplicationId); + + primary.run("Drop FUNCTION " + primaryDbName + ".testFunction "); + + WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verify(incrementalDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '*testfunction*'") + .verify(null); + } + + @Test public void testBootstrapFunctionReplication() throws Throwable { primary.run("CREATE FUNCTION " + primaryDbName + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index e26d54ce39..b357c712fc 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -40,7 +40,7 @@ private Driver driver; private HiveMetaStoreClient client; private HiveConf hconf; - + private static int schemaNameCounter = 0; private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); /** @@ -60,7 +60,7 @@ + Path.SEPARATOR + TestReplicationScenarios.class.getCanonicalName().replace('.', '_') + "_" - + System.currentTimeMillis(); + + +System.nanoTime(); if (metaStoreUri != null) { hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); @@ -73,6 +73,10 @@ hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hconf.setVar(HiveConf.ConfVars.REPLCMDIR, hiveWarehouseLocation + "/cmroot/"); + String schemaName = "APP" + schemaNameCounter++; + System.setProperty("datanucleus.mapping.Schema", schemaName); + hconf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, + "jdbc:derby:memory:${test.tmp.dir}/" + schemaName + ";create=true"); int metaStorePort = MetaStoreUtils.startMetaStore(hconf); hconf.setVar(HiveConf.ConfVars.REPLDIR, hiveWarehouseLocation + "/hrepl/"); hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort); @@ -141,7 +145,7 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Thro } WarehouseInstance verify(String data) throws IOException { - verifyResults(new String[] { data }); + verifyResults(data == null ? new String[] {} : new String[] { data }); return this; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index 5fa98088dc..05805466e9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DefaultHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DropFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropPartitionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropTableHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.InsertHandler; @@ -49,6 +50,12 @@ public MessageHandler handler() { return new DropTableHandler(); } }, + EVENT_DROP_FUNCTION("EVENT_DROP_FUNCTION") { + @Override + public MessageHandler handler() { + return new DropFunctionHandler(); + } + }, EVENT_DROP_PARTITION("EVENT_DROP_PARTITION") { @Override public MessageHandler handler() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java new file mode 100644 index 0000000000..352b0ccbd2 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DropFunctionHandler extends AbstractEventHandler { + + DropFunctionHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_DROP_FUNCTION; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index 08dbd13822..7e655fad1a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -41,6 +41,7 @@ private EventHandlerFactory() { register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class); register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class); register(MessageFactory.INSERT_EVENT, InsertHandler.class); + register(MessageFactory.DROP_FUNCTION_EVENT, DropFunctionHandler.class); } static void register(String event, Class handlerClazz) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java new file mode 100644 index 0000000000..daf7b2aafb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.hadoop.hive.ql.exec.FunctionUtils; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DropFunctionDesc; +import org.apache.hadoop.hive.ql.plan.FunctionWork; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +public class DropFunctionHandler extends AbstractMessageHandler { + @Override + public List> handle(Context context) + throws SemanticException { + DropFunctionMessage msg = deserializer.getDropFunctionMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + String qualifiedFunctionName = + FunctionUtils.qualifyFunctionName(msg.getFunctionName(), actualDbName); + DropFunctionDesc desc = new DropFunctionDesc(qualifiedFunctionName, false); + Task dropFunctionTask = TaskFactory.get(new FunctionWork(desc), context.hiveConf); + context.log.debug( + "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName() + ); + databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + return Collections.singletonList(dropFunctionTask); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index 943a6a6665..e6e06c339c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -38,13 +38,15 @@ DropTableDesc dropTableDesc = new DropTableDesc( actualDbName + "." + actualTblName, null, true, true, - eventOnlyReplicationSpec(context)); + eventOnlyReplicationSpec(context) + ); Task dropTableTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, dropTableDesc), context.hiveConf ); - context.log - .debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); + context.log.debug( + "Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName() + ); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); return Collections.singletonList(dropTableTask); }