diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 5621f26dd4..3c1ef082cf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -27,12 +27,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; + public class TestReplicationScenariosAcrossInstances { @Rule public final TestName testName = new TestName(); @Rule - public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule(); + public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList()); protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); @@ -64,7 +66,7 @@ private String dropCommand(String dbName) { } @Test - public void testIncrementalFunctionReplication() throws Throwable { + public void testCreateFunctionIncrementalReplication() throws Throwable { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) @@ -83,6 +85,26 @@ public void testIncrementalFunctionReplication() throws Throwable { } @Test + public void testDropFunctionIncrementalReplication() throws Throwable { + primary.run("CREATE FUNCTION " + primaryDbName + + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " + + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verify(bootStrapDump.lastReplicationId); + + primary.run("Drop FUNCTION " + primaryDbName + ".testFunction "); + + WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verify(incrementalDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '*testfunction*'") + .verify(null); + } + + @Test public void testBootstrapFunctionReplication() throws Throwable { primary.run("CREATE FUNCTION " + primaryDbName + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index f8bb24884d..7271eaea72 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -42,9 +42,8 @@ private Driver driver; private HiveMetaStoreClient client; private HiveConf hconf; - private ReplicationV1CompatRule bcompat = null; - + private static int schemaNameCounter = 0; private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); /** @@ -55,7 +54,6 @@ this.driver = other.driver; this.client = other.client; this.hconf = other.hconf; - this.bcompat = other.bcompat; } WarehouseInstance() throws Exception { @@ -65,7 +63,7 @@ + Path.SEPARATOR + TestReplicationScenarios.class.getCanonicalName().replace('.', '_') + "_" - + System.currentTimeMillis(); + + System.nanoTime(); if (metaStoreUri != null) { hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); @@ -78,6 +76,10 @@ hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hconf.setVar(HiveConf.ConfVars.REPLCMDIR, hiveWarehouseLocation + "/cmroot/"); + String schemaName = "APP" + schemaNameCounter++; + System.setProperty("datanucleus.mapping.Schema", schemaName); + hconf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, + "jdbc:derby:memory:${test.tmp.dir}/" + schemaName + ";create=true"); int metaStorePort = MetaStoreUtils.startMetaStore(hconf); hconf.setVar(HiveConf.ConfVars.REPLDIR, hiveWarehouseLocation + "/hrepl/"); hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort); @@ -95,8 +97,6 @@ driver = new Driver(hconf); SessionState.start(new CliSessionState(hconf)); client = new HiveMetaStoreClient(hconf); - - bcompat = new ReplicationV1CompatRule(client,hconf); } private int next = 0; @@ -148,7 +148,7 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Thro } WarehouseInstance verify(String data) throws IOException { - verifyResults(new String[] { data }); + verifyResults(data == null ? new String[] {} : new String[] { data }); return this; } @@ -186,12 +186,8 @@ private void printOutput() throws IOException { } } - public TestRule getReplivationV1CompatRule(){ - return bcompat; - } - - public void doBackwardCompatibilityCheck(boolean eventsMustExist) { - bcompat.doBackwardCompatibilityCheck(eventsMustExist); + public ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip){ + return new ReplicationV1CompatRule(client,hconf,testsToSkip); } static class Tuple { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index 5fa98088dc..05805466e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DefaultHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DropFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropPartitionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.DropTableHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.InsertHandler; @@ -49,6 +50,12 @@ public MessageHandler handler() { return new DropTableHandler(); } }, + EVENT_DROP_FUNCTION("EVENT_DROP_FUNCTION") { + @Override + public MessageHandler handler() { + return new DropFunctionHandler(); + } + }, EVENT_DROP_PARTITION("EVENT_DROP_PARTITION") { @Override public MessageHandler handler() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java new file mode 100644 index 0000000000..352b0ccbd2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropFunctionHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class DropFunctionHandler extends AbstractEventHandler { + + DropFunctionHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} DROP_TABLE message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_DROP_FUNCTION; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index 08dbd13822..7e655fad1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -41,6 +41,7 @@ private EventHandlerFactory() { register(MessageFactory.DROP_PARTITION_EVENT, DropPartitionHandler.class); register(MessageFactory.DROP_TABLE_EVENT, DropTableHandler.class); register(MessageFactory.INSERT_EVENT, InsertHandler.class); + register(MessageFactory.DROP_FUNCTION_EVENT, DropFunctionHandler.class); } static void register(String event, Class handlerClazz) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java new file mode 100644 index 0000000000..daf7b2aafb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropFunctionHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.hadoop.hive.ql.exec.FunctionUtils; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.DropFunctionDesc; +import org.apache.hadoop.hive.ql.plan.FunctionWork; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +public class DropFunctionHandler extends AbstractMessageHandler { + @Override + public List> handle(Context context) + throws SemanticException { + DropFunctionMessage msg = deserializer.getDropFunctionMessage(context.dmd.getPayload()); + String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; + String qualifiedFunctionName = + FunctionUtils.qualifyFunctionName(msg.getFunctionName(), actualDbName); + DropFunctionDesc desc = new DropFunctionDesc(qualifiedFunctionName, false); + Task dropFunctionTask = TaskFactory.get(new FunctionWork(desc), context.hiveConf); + context.log.debug( + "Added drop function task : {}:{}", dropFunctionTask.getId(), desc.getFunctionName() + ); + databasesUpdated.put(actualDbName, context.dmd.getEventTo()); + return Collections.singletonList(dropFunctionTask); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index 943a6a6665..e6e06c339c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -38,13 +38,15 @@ DropTableDesc dropTableDesc = new DropTableDesc( actualDbName + "." + actualTblName, null, true, true, - eventOnlyReplicationSpec(context)); + eventOnlyReplicationSpec(context) + ); Task dropTableTask = TaskFactory.get( new DDLWork(readEntitySet, writeEntitySet, dropTableDesc), context.hiveConf ); - context.log - .debug("Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName()); + context.log.debug( + "Added drop tbl task : {}:{}", dropTableTask.getId(), dropTableDesc.getTableName() + ); databasesUpdated.put(actualDbName, context.dmd.getEventTo()); return Collections.singletonList(dropTableTask); }