diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java similarity index 99% rename from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java rename to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 5173d8b4fc..96bd168aec 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hive.ql; +package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,12 +28,13 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; -import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -56,8 +57,8 @@ import java.util.List; import java.util.Map; -import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -148,19 +149,17 @@ private synchronized void advanceDumpDir() { ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next)); } - @Test - public void testFunctionReplicationAsPartOfBootstrap() throws IOException { - String dbName = createDB(testName.getMethodName()); - run("CREATE FUNCTION " + dbName - + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " - + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); - - String replicatedDbName = loadAndVerify(dbName); - run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'"); - verifyResults(new String[] { replicatedDbName + ".testFunction" }); + static class Tuple { + final String replicatedDbName; + final String lastReplicationId; + + Tuple(String replicatedDbName, String lastReplicationId) { + this.replicatedDbName = replicatedDbName; + this.lastReplicationId = lastReplicationId; + } } - private String loadAndVerify(String dbName) throws IOException { + private Tuple loadAndVerify(String dbName) throws IOException { advanceDumpDir(); run("REPL DUMP " + dbName); String dumpLocation = getResult(0, 0); @@ -170,10 +169,9 @@ private String loadAndVerify(String dbName) throws IOException { printOutput(); run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); verifyRun("REPL STATUS " + replicatedDbName, lastReplicationId); - return replicatedDbName; + return new Tuple(replicatedDbName, lastReplicationId); } - /** * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned. * Inserts data into one of the ptned tables, and one of the unptned tables, @@ -211,7 +209,7 @@ public void testBasic() throws IOException { verifySetup("SELECT a from " + dbName + ".ptned_empty", empty); verifySetup("SELECT * from " + dbName + ".unptned_empty", empty); - String replicatedDbName = loadAndVerify(dbName); + String replicatedDbName = loadAndVerify(dbName).replicatedDbName; verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data); verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java new file mode 100644 index 0000000000..6f7effbe3b --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestReplicationScenariosAcrossInstances { + @Rule + public final TestName testName = new TestName(); + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + + private static WareHouse primary, replica; + + @BeforeClass + public static void classLevelSetup() throws Exception { + primary = new WareHouse(); + replica = new WareHouse(); + } + + private String primaryDbName, replicatedDbName; + + @Before + public void setup() throws Throwable { + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName); + } + + @After + public void tearDown() throws Throwable { + primary.run(dropCommand(primaryDbName)); + replica.run(dropCommand(replicatedDbName)); + } + + private String dropCommand(String dbName) { + return "drop database if exists " + dbName + " cascade "; + } + + @Test + public void testIncrementalFunctionReplication() throws Throwable { + WareHouse.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verify(bootStrapDump.lastReplicationId); + + primary.run("CREATE FUNCTION " + primaryDbName + + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " + + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + + WareHouse.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verify(incrementalDump.lastReplicationId) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") + .verify(replicatedDbName + ".testFunction"); + } + + @Test + public void testBootstrapFunctionReplication() throws Throwable { + primary.run("CREATE FUNCTION " + primaryDbName + + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " + + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + WareHouse.Tuple bootStrapDump = primary.dump(primaryDbName, null); + + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") + .verify(replicatedDbName + ".testFunction"); + } +} diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WareHouse.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WareHouse.java new file mode 100644 index 0000000000..c643aefb3a --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WareHouse.java @@ -0,0 +1,173 @@ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.hcatalog.listener.DbNotificationListener; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +class WareHouse { + private Driver driver; + private HiveMetaStoreClient client; + private HiveConf hconf; + + private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); + + /** + * This will be used to allow the primary and replica warehouse to be the same instance of + * hive server + */ + WareHouse(WareHouse other){ + this.driver = other.driver; + this.client = other.client; + this.hconf = other.hconf; + } + + WareHouse() throws Exception { + hconf = new HiveConf(TestReplicationScenarios.class); + String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname); + String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp") + + Path.SEPARATOR + + TestReplicationScenarios.class.getCanonicalName().replace('.', '_') + + "_" + + System.currentTimeMillis(); + + if (metaStoreUri != null) { + hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); + // useExternalMS = true; + return; + } + + // turn on db notification listener on meta store + hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS); + hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); + hconf.setVar(HiveConf.ConfVars.REPLCMDIR, hiveWarehouseLocation + "/cmroot/"); + int metaStorePort = MetaStoreUtils.startMetaStore(hconf); + hconf.setVar(HiveConf.ConfVars.REPLDIR, hiveWarehouseLocation + "/hrepl/"); + hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort); + hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hconf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); + System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); + + Path testPath = new Path(hiveWarehouseLocation); + FileSystem fs = FileSystem.get(testPath.toUri(), hconf); + fs.mkdirs(testPath); + + driver = new Driver(hconf); + SessionState.start(new CliSessionState(hconf)); + client = new HiveMetaStoreClient(hconf); + } + + private int next = 0; + + private void advanceDumpDir() { + next++; + ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next)); + } + + private ArrayList lastResults; + + private String row0Result(int colNum, boolean reuse) throws IOException { + if (!reuse) { + lastResults = new ArrayList<>(); + try { + driver.getResults(lastResults); + } catch (CommandNeedRetryException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + // Split around the 'tab' character + return (lastResults.get(0).split("\\t"))[colNum]; + } + + WareHouse run(String command) throws Throwable { + CommandProcessorResponse ret = driver.run(command); + if (ret.getException() != null) { + throw ret.getException(); + } + return this; + } + + Tuple dump(String dbName, String lastReplicationId) throws Throwable { + advanceDumpDir(); + String dumpCommand = + "REPL DUMP " + dbName + (lastReplicationId == null ? "" : " FROM " + lastReplicationId); + run(dumpCommand); + String dumpLocation = row0Result(0, false); + String lastDumpId = row0Result(1, true); + return new Tuple(dumpLocation, lastDumpId); + } + + WareHouse load(String replicatedDbName, String dumpLocation) throws Throwable { + run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + printOutput(); + run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + return this; + } + + WareHouse verify(String data) throws IOException { + verifyResults(new String[] { data }); + return this; + } + + /** + * All the results that are read from the hive output will not preserve + * case sensitivity and will all be in lower case, hence we will check against + * only lower case data values. + * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case + * before assert. + */ + private void verifyResults(String[] data) throws IOException { + List results = getOutput(); + TestReplicationScenariosAcrossInstances.LOG.info("Expecting {}", data); + TestReplicationScenariosAcrossInstances.LOG.info("Got {}", results); + assertEquals(data.length, results.size()); + for (int i = 0; i < data.length; i++) { + assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); + } + } + + List getOutput() throws IOException { + List results = new ArrayList<>(); + try { + driver.getResults(results); + } catch (CommandNeedRetryException e) { + TestReplicationScenariosAcrossInstances.LOG.warn(e.getMessage(), e); + throw new RuntimeException(e); + } + return results; + } + + private void printOutput() throws IOException { + for (String s : getOutput()) { + TestReplicationScenariosAcrossInstances.LOG.info(s); + } + } + + static class Tuple { + final String dumpLocation; + final String lastReplicationId; + + Tuple(String dumpLocation, String lastReplicationId) { + this.dumpLocation = dumpLocation; + this.lastReplicationId = lastReplicationId; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index a9384be707..47bebc8b12 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -263,7 +263,7 @@ public static void createExportDump(FileSystem fs, Path metadataPath, } } - static MetaData readMetaData(FileSystem fs, Path metadataPath) + public static MetaData readMetaData(FileSystem fs, Path metadataPath) throws IOException, SemanticException { String message = readAsString(fs, metadataPath); try { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 5d1d2fdbea..ba7dfb17d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; -import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandlerFactory; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; @@ -667,12 +666,12 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } private List> analyzeEventLoad( - String dbName, String tblName, String locn, Task precursor, + String dbName, String tblName, String location, Task precursor, Map dbsUpdated, Map tablesUpdated, DumpMetaData dmd) throws SemanticException { MessageHandler.Context context = - new MessageHandler.Context(dbName, tblName, locn, precursor, dmd, conf, db, ctx, LOG); - MessageHandler messageHandler = MessageHandlerFactory.handlerFor(dmd.getDumpType()); + new MessageHandler.Context(dbName, tblName, location, precursor, dmd, conf, db, ctx, LOG); + MessageHandler messageHandler = dmd.getDumpType().handler(); List> tasks = messageHandler.handle(context); if (precursor != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index c2cffaa3d9..5fa98088dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -17,22 +17,110 @@ */ package org.apache.hadoop.hive.ql.parse.repl; +import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DefaultHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DropPartitionHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.DropTableHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.InsertHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.RenamePartitionHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.RenameTableHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.TableHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncatePartitionHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncateTableHandler; + public enum DumpType { - BOOTSTRAP("BOOTSTRAP"), - INCREMENTAL("INCREMENTAL"), - EVENT_CREATE_TABLE("EVENT_CREATE_TABLE"), - EVENT_ADD_PARTITION("EVENT_ADD_PARTITION"), - EVENT_DROP_TABLE("EVENT_DROP_TABLE"), - EVENT_DROP_PARTITION("EVENT_DROP_PARTITION"), - EVENT_ALTER_TABLE("EVENT_ALTER_TABLE"), - EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"), - EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE"), - EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), - EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), - EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION"), - EVENT_INSERT("EVENT_INSERT"), - EVENT_CREATE_FUNCTION("EVENT_CREATE_FUNCTION"), - EVENT_UNKNOWN("EVENT_UNKNOWN"); + + EVENT_CREATE_TABLE("EVENT_CREATE_TABLE") { + @Override + public MessageHandler handler() { + return new TableHandler(); + } + }, + EVENT_ADD_PARTITION("EVENT_ADD_PARTITION") { + @Override + public MessageHandler handler() { + return new TableHandler(); + } + }, + EVENT_DROP_TABLE("EVENT_DROP_TABLE") { + @Override + public MessageHandler handler() { + return new DropTableHandler(); + } + }, + EVENT_DROP_PARTITION("EVENT_DROP_PARTITION") { + @Override + public MessageHandler handler() { + return new DropPartitionHandler(); + } + }, + EVENT_ALTER_TABLE("EVENT_ALTER_TABLE") { + @Override + public MessageHandler handler() { + return new TableHandler(); + } + }, + EVENT_RENAME_TABLE("EVENT_RENAME_TABLE") { + @Override + public MessageHandler handler() { + return new RenameTableHandler(); + } + }, + EVENT_TRUNCATE_TABLE("EVENT_TRUNCATE_TABLE") { + @Override + public MessageHandler handler() { + return new TruncateTableHandler(); + } + }, + EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION") { + @Override + public MessageHandler handler() { + return new TableHandler(); + } + }, + EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION") { + @Override + public MessageHandler handler() { + return new RenamePartitionHandler(); + } + }, + EVENT_TRUNCATE_PARTITION("EVENT_TRUNCATE_PARTITION") { + @Override + public MessageHandler handler() { + return new TruncatePartitionHandler(); + } + }, + EVENT_INSERT("EVENT_INSERT") { + @Override + public MessageHandler handler() { + return new InsertHandler(); + } + }, + EVENT_CREATE_FUNCTION("EVENT_CREATE_FUNCTION") { + @Override + public MessageHandler handler() { + return new CreateFunctionHandler(); + } + }, + EVENT_UNKNOWN("EVENT_UNKNOWN") { + @Override + public MessageHandler handler() { + return new DefaultHandler(); + } + }, + BOOTSTRAP("BOOTSTRAP") { + @Override + public MessageHandler handler() { + return new DefaultHandler(); + } + }, + INCREMENTAL("INCREMENTAL") { + @Override + public MessageHandler handler() { + return new DefaultHandler(); + } + }; String type = null; DumpType(String type) { @@ -43,4 +131,6 @@ public String toString(){ return type; } + + public abstract MessageHandler handler(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java similarity index 88% rename from ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java rename to ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index ba699e3ed4..a70c673e8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -23,13 +23,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class AbstractHandler implements EventHandler { - static final Logger LOG = LoggerFactory.getLogger(AbstractHandler.class); +abstract class AbstractEventHandler implements EventHandler { + static final Logger LOG = LoggerFactory.getLogger(AbstractEventHandler.class); final NotificationEvent event; final MessageDeserializer deserializer; - AbstractHandler(NotificationEvent event) { + AbstractEventHandler(NotificationEvent event) { this.event = event; deserializer = MessageFactory.getInstance().getDeserializer(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index f4239e5b07..52d136fde0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; -class AddPartitionHandler extends AbstractHandler { +class AddPartitionHandler extends AbstractEventHandler { protected AddPartitionHandler(NotificationEvent notificationEvent) { super(notificationEvent); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java index 8a7e742b72..a9db135dc2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class AlterPartitionHandler extends AbstractHandler { +class AlterPartitionHandler extends AbstractEventHandler { private final org.apache.hadoop.hive.metastore.api.Partition after; private final org.apache.hadoop.hive.metastore.api.Table tableObject; private final boolean isTruncateOp; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java index f457f23f34..ab9a9de373 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class AlterTableHandler extends AbstractHandler { +class AlterTableHandler extends AbstractEventHandler { private final org.apache.hadoop.hive.metastore.api.Table after; private final boolean isTruncateOp; private final Scenario scenario; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java index bebf0358e8..48982465fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java @@ -10,7 +10,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; -class CreateFunctionHandler extends AbstractHandler { +class CreateFunctionHandler extends AbstractEventHandler { CreateFunctionHandler(NotificationEvent event) { super(event); } @@ -27,6 +27,7 @@ public void handle(Context withinContext) throws Exception { new FunctionSerializer(createFunctionMessage.getFunctionObj()) .writeTo(jsonWriter, withinContext.replicationSpec); } + withinContext.createDmd(this).write(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index ca3607f77a..8737d502ee 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType; -class CreateTableHandler extends AbstractHandler { +class CreateTableHandler extends AbstractEventHandler { CreateTableHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java index 0d4665a453..8977f62963 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DefaultHandler.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class DefaultHandler extends AbstractHandler { +class DefaultHandler extends AbstractEventHandler { DefaultHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java index a4eacc4024..19b704411b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropPartitionHandler.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class DropPartitionHandler extends AbstractHandler { +class DropPartitionHandler extends AbstractEventHandler { DropPartitionHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java index 40cd5cb4f7..cce0192d43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class DropTableHandler extends AbstractHandler { +class DropTableHandler extends AbstractEventHandler { DropTableHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 0393701fae..f514fb24bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -class InsertHandler extends AbstractHandler { +class InsertHandler extends AbstractEventHandler { InsertHandler(NotificationEvent event) { super(event); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java index 12ad19b2ea..2d5e3b168b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; - import org.apache.hadoop.hive.ql.parse.repl.DumpType; import java.io.BufferedReader; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java new file mode 100644 index 0000000000..8b6179b072 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -0,0 +1,68 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.FunctionUtils; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; +import org.apache.hadoop.hive.ql.plan.FunctionWork; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +public class CreateFunctionHandler extends AbstractMessageHandler { + @Override + public List> handle(Context context) + throws SemanticException { + try { + FileSystem fs = FileSystem.get(new Path(context.location).toUri(), context.hiveConf); + MetaData metadata; + try { + metadata = EximUtil.readMetaData(fs, new Path(context.location, EximUtil.METADATA_NAME)); + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + + String dbName = context.isDbNameEmpty() ? metadata.function.getDbName() : context.dbName; + CreateFunctionDesc desc = new CreateFunctionDesc( + FunctionUtils.qualifyFunctionName(metadata.function.getFunctionName(), dbName), false, + metadata.function.getClassName(), metadata.function.getResourceUris() + ); + + Task task = TaskFactory.get(new FunctionWork(desc), context.hiveConf); + context.log.debug("Added create function task : {}:{},{}", task.getId(), + metadata.function.getFunctionName(), metadata.function.getClassName()); + databasesUpdated.put(dbName, context.dmd.getEventTo()); + return Collections.singletonList(task); + } catch (Exception e) { + throw (e instanceof SemanticException) + ? (SemanticException) e + : new SemanticException("Error reading message members", e); + } + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java index 6d346b6fe0..64e65180e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DefaultHandler.java @@ -24,7 +24,7 @@ import java.util.ArrayList; import java.util.List; -class DefaultHandler extends AbstractMessageHandler { +public class DefaultHandler extends AbstractMessageHandler { @Override public List> handle(Context withinContext) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 73f261369a..131d672b15 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -38,7 +38,7 @@ import java.util.List; import java.util.Map; -class DropPartitionHandler extends AbstractMessageHandler { +public class DropPartitionHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java index b623f2f939..943a6a6665 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropTableHandler.java @@ -28,7 +28,7 @@ import java.util.Collections; import java.util.List; -class DropTableHandler extends AbstractMessageHandler { +public class DropTableHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java index fa63169b7d..40ed0b27fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java @@ -24,7 +24,7 @@ import java.io.Serializable; import java.util.List; -class InsertHandler extends AbstractMessageHandler { +public class InsertHandler extends AbstractMessageHandler { @Override public List> handle(Context withinContext) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java deleted file mode 100644 index de6ff7464b..0000000000 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandlerFactory.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse.repl.load.message; - -import org.apache.hadoop.hive.metastore.api.NotificationEvent; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Modifier; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - -public class MessageHandlerFactory { - private static Map> messageHandlers = new HashMap<>(); - - static { - register(DumpType.EVENT_DROP_PARTITION, DropPartitionHandler.class); - register(DumpType.EVENT_DROP_TABLE, DropTableHandler.class); - register(DumpType.EVENT_INSERT, InsertHandler.class); - register(DumpType.EVENT_RENAME_PARTITION, RenamePartitionHandler.class); - register(DumpType.EVENT_RENAME_TABLE, RenameTableHandler.class); - - register(DumpType.EVENT_CREATE_TABLE, TableHandler.class); - register(DumpType.EVENT_ADD_PARTITION, TableHandler.class); - register(DumpType.EVENT_ALTER_TABLE, TableHandler.class); - register(DumpType.EVENT_ALTER_PARTITION, TableHandler.class); - - register(DumpType.EVENT_TRUNCATE_PARTITION, TruncatePartitionHandler.class); - register(DumpType.EVENT_TRUNCATE_TABLE, TruncateTableHandler.class); - } - - private static void register(DumpType eventType, Class handlerClazz) { - try { - Constructor constructor = - handlerClazz.getDeclaredConstructor(); - assert constructor != null; - assert !Modifier.isPrivate(constructor.getModifiers()); - messageHandlers.put(eventType, handlerClazz); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("handler class: " + handlerClazz.getCanonicalName() - + " does not have the a constructor with only parameter of type:" - + NotificationEvent.class.getCanonicalName(), e); - } - } - - public static MessageHandler handlerFor(DumpType eventType) { - if (messageHandlers.containsKey(eventType)) { - Class handlerClazz = messageHandlers.get(eventType); - try { - Constructor constructor = - handlerClazz.getDeclaredConstructor(); - return constructor.newInstance(); - } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { - // this should never happen. however we want to make sure we propagate the exception - throw new RuntimeException( - "failed when creating handler for " + eventType - + " with the responsible class being " + handlerClazz.getCanonicalName(), e); - } - } - return new DefaultHandler(); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java index 658f2baa6a..627fb46eef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.Map; -class RenamePartitionHandler extends AbstractMessageHandler { +public class RenamePartitionHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java index 2c429c169e..10f07532e3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java @@ -28,7 +28,7 @@ import java.util.Collections; import java.util.List; -class RenameTableHandler extends AbstractMessageHandler { +public class RenameTableHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index 2db83853c0..09d70ebe42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -27,7 +27,7 @@ import java.util.LinkedHashMap; import java.util.List; -class TableHandler extends AbstractMessageHandler { +public class TableHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { // Path being passed to us is a table dump location. We go ahead and load it in as needed. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java index 5436f0d486..fe457883a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncatePartitionHandler.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; -class TruncatePartitionHandler extends AbstractMessageHandler { +public class TruncatePartitionHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java index 731383cfa5..fc024f1257 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TruncateTableHandler.java @@ -28,7 +28,7 @@ import java.util.Collections; import java.util.List; -class TruncateTableHandler extends AbstractMessageHandler { +public class TruncateTableHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { AlterTableMessage msg = deserializer.getAlterTableMessage(context.dmd.getPayload());