diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2dfc8b6f89..5344f36887 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -449,6 +449,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal REPLCMINTERVAL("hive.repl.cm.interval","3600s", new TimeValidator(TimeUnit.SECONDS), "Inteval for cmroot cleanup thread."), + REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/", + "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml index 8adf309430..ba9d7b947e 100644 --- itests/hive-unit/pom.xml +++ itests/hive-unit/pom.xml @@ -393,6 +393,12 @@ ${curator.version} test + + org.hamcrest + hamcrest-library + ${hamcrest.version} + test + diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index 3f9eec3e35..1495c1a8a1 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -47,6 +47,9 @@ import com.google.common.collect.ImmutableMap; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class TestReplChangeManager { private static HiveMetaStoreClient client; private static HiveConf hiveConf; @@ -163,28 +166,28 @@ public void testRecyclePartTable() throws Exception { createFile(part3Path, "p3"); String path3Chksum = ReplChangeManager.getChksumString(part3Path, fs); - Assert.assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path)); - Assert.assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path)); - Assert.assertTrue(part3Path.getFileSystem(hiveConf).exists(part3Path)); + assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path)); + assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path)); + assertTrue(part3Path.getFileSystem(hiveConf).exists(part3Path)); ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf); // verify cm.recycle(db, table, part) api moves file to cmroot dir int ret = cm.recycle(part1Path, false); Assert.assertEquals(ret, 1); - Path cmPart1Path = ReplChangeManager.getCMPath(part1Path, hiveConf, path1Chksum); - Assert.assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path)); + Path cmPart1Path = ReplChangeManager.getCMPath(hiveConf, path1Chksum); + assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path)); // Verify dropPartition recycle part files client.dropPartition(dbName, tblName, Arrays.asList("20160102")); - Assert.assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path)); - Path cmPart2Path = ReplChangeManager.getCMPath(part2Path, hiveConf, path2Chksum); - Assert.assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path)); + assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path)); + Path cmPart2Path = ReplChangeManager.getCMPath(hiveConf, path2Chksum); + assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path)); // Verify dropTable recycle partition files client.dropTable(dbName, tblName); - Assert.assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path)); - Path cmPart3Path = ReplChangeManager.getCMPath(part3Path, hiveConf, path3Chksum); - Assert.assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path)); + assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path)); + Path cmPart3Path = ReplChangeManager.getCMPath(hiveConf, path3Chksum); + assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path)); client.dropDatabase(dbName, true, true); } @@ -233,28 +236,28 @@ public void testRecycleNonPartTable() throws Exception { createFile(filePath3, "f3"); String fileChksum3 = ReplChangeManager.getChksumString(filePath3, fs); - Assert.assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1)); - Assert.assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2)); - Assert.assertTrue(filePath3.getFileSystem(hiveConf).exists(filePath3)); + assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1)); + assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2)); + assertTrue(filePath3.getFileSystem(hiveConf).exists(filePath3)); ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf); // verify cm.recycle(Path) api moves file to cmroot dir cm.recycle(filePath1, false); - Assert.assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1)); + assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1)); - Path cmPath1 = ReplChangeManager.getCMPath(filePath1, hiveConf, fileChksum1); - Assert.assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1)); + Path cmPath1 = ReplChangeManager.getCMPath(hiveConf, fileChksum1); + assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1)); // Verify dropTable recycle table files client.dropTable(dbName, tblName); - Path cmPath2 = ReplChangeManager.getCMPath(filePath2, hiveConf, fileChksum2); - Assert.assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2)); - Assert.assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2)); + Path cmPath2 = ReplChangeManager.getCMPath(hiveConf, fileChksum2); + assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2)); + assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2)); - Path cmPath3 = ReplChangeManager.getCMPath(filePath3, hiveConf, fileChksum3); - Assert.assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3)); - Assert.assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3)); + Path cmPath3 = ReplChangeManager.getCMPath(hiveConf, fileChksum3); + assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3)); + assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3)); client.dropDatabase(dbName, true, true); } @@ -294,17 +297,17 @@ public void testClearer() throws Exception { ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false); ReplChangeManager.getInstance(hiveConf).recycle(dirTbl3, true); - Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11))); - Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part12, hiveConf, fileChksum12))); - Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21))); - Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part22, hiveConf, fileChksum22))); - Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31))); - Assert.assertTrue(fs.exists(ReplChangeManager.getCMPath(part32, hiveConf, fileChksum32))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum11))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum12))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum21))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum22))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum31))); + assertTrue(fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum32))); - fs.setTimes(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31), now - 86400*1000*2, now - 86400*1000*2); - fs.setTimes(ReplChangeManager.getCMPath(part32, hiveConf, fileChksum32), now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, fileChksum11), now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, fileChksum21), now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, fileChksum31), now - 86400*1000*2, now - 86400*1000*2); + fs.setTimes(ReplChangeManager.getCMPath(hiveConf, fileChksum32), now - 86400*1000*2, now - 86400*1000*2); ReplChangeManager.scheduleCMClearer(hiveConf); @@ -317,14 +320,22 @@ public void testClearer() throws Exception { if (end - start > 5000) { Assert.fail("timeout, cmroot has not been cleared"); } - if (!fs.exists(ReplChangeManager.getCMPath(part11, hiveConf, fileChksum11)) && - fs.exists(ReplChangeManager.getCMPath(part12, hiveConf, fileChksum12)) && - !fs.exists(ReplChangeManager.getCMPath(part21, hiveConf, fileChksum21)) && - fs.exists(ReplChangeManager.getCMPath(part22, hiveConf, fileChksum22)) && - !fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31)) && - !fs.exists(ReplChangeManager.getCMPath(part31, hiveConf, fileChksum31))) { + if (!fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum11)) && + fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum12)) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum21)) && + fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum22)) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum31)) && + !fs.exists(ReplChangeManager.getCMPath(hiveConf, fileChksum31))) { cleared = true; } } while (!cleared); } + + @Test + public void shouldIdentifyCMURIs() { + assertTrue(ReplChangeManager + .isCMFileUri(new Path("hdfs://localhost:90000/somepath/adir/", "ab.jar#e239s2233"), fs)); + assertFalse(ReplChangeManager + .isCMFileUri(new Path("/somepath/adir/", "ab.jar"), fs)); + } } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 3c1ef082cf..6713dff73e 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -1,23 +1,30 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at +

+ http://www.apache.org/licenses/LICENSE-2.0 +

+ Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.ql.parse; -import org.junit.After; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; +import org.apache.hadoop.hive.ql.util.DependencyResolver; +import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -27,44 +34,56 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; public class TestReplicationScenariosAcrossInstances { @Rule public final TestName testName = new TestName(); @Rule - public TestRule replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList()); + public TestRule replV1BackwardCompat; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); private static WarehouseInstance primary, replica; - @BeforeClass - public static void classLevelSetup() throws Exception { - primary = new WarehouseInstance(); - replica = new WarehouseInstance(); - } + @BeforeClass + public static void classLevelSetup() throws Exception { + Configuration conf = new Configuration(); + conf.set("dfs.client.use.datanode.hostname", "true"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + primary = new WarehouseInstance(LOG, miniDFSCluster); + replica = new WarehouseInstance(LOG, miniDFSCluster); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + } private String primaryDbName, replicatedDbName; @Before public void setup() throws Throwable { + replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; primary.run("create database " + primaryDbName); } - @After - public void tearDown() throws Throwable { - primary.run(dropCommand(primaryDbName)); - replica.run(dropCommand(replicatedDbName)); - } - - private String dropCommand(String dbName) { - return "drop database if exists " + dbName + " cascade "; - } - @Test public void testCreateFunctionIncrementalReplication() throws Throwable { WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); @@ -73,10 +92,11 @@ public void testCreateFunctionIncrementalReplication() throws Throwable { .verify(bootStrapDump.lastReplicationId); primary.run("CREATE FUNCTION " + primaryDbName - + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " - + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + + ".testFunction as 'hivemall.tools.string.StopwordUDF' " + + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'"); - WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + WarehouseInstance.Tuple incrementalDump = + primary.dump(primaryDbName, bootStrapDump.lastReplicationId); replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verify(incrementalDump.lastReplicationId) @@ -96,7 +116,8 @@ public void testDropFunctionIncrementalReplication() throws Throwable { primary.run("Drop FUNCTION " + primaryDbName + ".testFunction "); - WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + WarehouseInstance.Tuple incrementalDump = + primary.dump(primaryDbName, bootStrapDump.lastReplicationId); replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verify(incrementalDump.lastReplicationId) @@ -107,8 +128,8 @@ public void testDropFunctionIncrementalReplication() throws Throwable { @Test public void testBootstrapFunctionReplication() throws Throwable { primary.run("CREATE FUNCTION " + primaryDbName - + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " - + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + + ".testFunction as 'hivemall.tools.string.StopwordUDF' " + + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'"); WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) @@ -116,4 +137,71 @@ public void testBootstrapFunctionReplication() throws Throwable { .verify(replicatedDbName + ".testFunction"); } + @Test + public void testCreateFunctionWithFunctionBinaryJarsOnHDFS() throws Throwable { + Dependencies dependencies = dependencies("ivy://io.github.myui:hivemall:0.4.0-2", primary); + String jarSubString = dependencies.toJarSubSql(); + + primary.run("CREATE FUNCTION " + primaryDbName + + ".anotherFunction as 'hivemall.tools.string.StopwordUDF' " + + "using " + jarSubString); + + WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "*'") + .verify(replicatedDbName + ".anotherFunction"); + + FileStatus[] fileStatuses = replica.miniDFSCluster.getFileSystem().globStatus( + new Path( + replica.functionsRoot + "/" + replicatedDbName.toLowerCase() + "/anotherfunction/*/*") + , path -> path.toString().endsWith("jar")); + List expectedDependenciesNames = dependencies.jarNames(); + assertThat(fileStatuses.length, is(equalTo(expectedDependenciesNames.size()))); + List jars = Arrays.stream(fileStatuses).map(f -> { + String[] splits = f.getPath().toString().split("/"); + return splits[splits.length - 1]; + }).collect(Collectors.toList()); + + assertThat(jars, containsInAnyOrder(expectedDependenciesNames.toArray())); + } + + static class Dependencies { + private final List fullQualifiedJarPaths; + + Dependencies(List fullQualifiedJarPaths) { + this.fullQualifiedJarPaths = fullQualifiedJarPaths; + } + + private String toJarSubSql() { + return StringUtils.join( + fullQualifiedJarPaths.stream().map(p -> "jar '" + p + "'").collect(Collectors.toList()), + "," + ); + } + + private List jarNames() { + return fullQualifiedJarPaths.stream().map(p -> { + String[] splits = p.toString().split("/"); + return splits[splits.length - 1]; + }).collect(Collectors.toList()); + } + } + + private Dependencies dependencies(String ivyPath, WarehouseInstance onWarehouse) + throws IOException, URISyntaxException, SemanticException { + List localUris = new DependencyResolver().downloadDependencies(new URI(ivyPath)); + List remotePaths = onWarehouse.copyToHDFS(localUris); + List collect = + remotePaths.stream().map(r -> { + try { + return PathBuilder + .fullyQualifiedHDFSUri(r, onWarehouse.miniDFSCluster.getFileSystem()); + + } catch (Exception e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + return new Dependencies(collect); + } } diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 7271eaea72..a35f7b20b4 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -1,102 +1,131 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at +

+ http://www.apache.org/licenses/LICENSE-2.0 +

+ Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.ql.parse; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule; import org.apache.hive.hcatalog.listener.DbNotificationListener; -import org.junit.rules.TestRule; +import org.codehaus.plexus.util.ExceptionUtils; +import org.slf4j.Logger; +import java.io.Closeable; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; -class WarehouseInstance { +class WarehouseInstance implements Closeable { + final String functionsRoot; + private Logger logger; private Driver driver; + private HiveConf hiveConf; + MiniDFSCluster miniDFSCluster; private HiveMetaStoreClient client; - private HiveConf hconf; - private static int schemaNameCounter = 0; + private static int uniqueIdentifier = 0; + private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); - /** - * This will be used to allow the primary and replica warehouse to be the same instance of - * hive server - */ - WarehouseInstance(WarehouseInstance other){ - this.driver = other.driver; - this.client = other.client; - this.hconf = other.hconf; + WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { + this.logger = logger; + this.miniDFSCluster = cluster; + assert miniDFSCluster.isClusterUp(); + assert miniDFSCluster.isDataNodeUp(); + DistributedFileSystem fs = miniDFSCluster.getFileSystem(); + + Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); + this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); + initialize(cmRootPath.toString()); } - WarehouseInstance() throws Exception { - hconf = new HiveConf(TestReplicationScenarios.class); + private void initialize(String cmRoot) throws Exception { + hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname); String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + TestReplicationScenarios.class.getCanonicalName().replace('.', '_') + "_" + System.nanoTime(); - if (metaStoreUri != null) { - hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); - // useExternalMS = true; + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); return; } // turn on db notification listener on meta store - hconf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS); - hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); - hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); - hconf.setVar(HiveConf.ConfVars.REPLCMDIR, hiveWarehouseLocation + "/cmroot/"); - String schemaName = "APP" + schemaNameCounter++; + hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS); + hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); + hiveConf.setVar(HiveConf.ConfVars.REPLCMDIR, cmRoot); + hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot); + String schemaName = "APP" + uniqueIdentifier; System.setProperty("datanucleus.mapping.Schema", schemaName); - hconf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, + hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:memory:${test.tmp.dir}/" + schemaName + ";create=true"); - int metaStorePort = MetaStoreUtils.startMetaStore(hconf); - hconf.setVar(HiveConf.ConfVars.REPLDIR, hiveWarehouseLocation + "/hrepl/"); - hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort); - hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); - hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hconf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + + int metaStorePort = MetaStoreUtils.startMetaStore(hiveConf); + hiveConf.setVar(HiveConf.ConfVars.REPLDIR, + hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/"); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + metaStorePort); + hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); Path testPath = new Path(hiveWarehouseLocation); - FileSystem fs = FileSystem.get(testPath.toUri(), hconf); - fs.mkdirs(testPath); + FileSystem testPathFileSystem = FileSystem.get(testPath.toUri(), hiveConf); + testPathFileSystem.mkdirs(testPath); + + driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + client = new HiveMetaStoreClient(hiveConf); + // change the value for the next instance. + ++uniqueIdentifier; + } - driver = new Driver(hconf); - SessionState.start(new CliSessionState(hconf)); - client = new HiveMetaStoreClient(hconf); + private Path mkDir(DistributedFileSystem fs, String pathString) + throws IOException, SemanticException { + Path path = new Path(pathString); + fs.mkdir(path, new FsPermission("777")); + return PathBuilder.fullyQualifiedHDFSUri(path, fs); } private int next = 0; @@ -161,8 +190,8 @@ WarehouseInstance verify(String data) throws IOException { */ private void verifyResults(String[] data) throws IOException { List results = getOutput(); - TestReplicationScenariosAcrossInstances.LOG.info("Expecting {}", data); - TestReplicationScenariosAcrossInstances.LOG.info("Got {}", results); + logger.info("Expecting {}", StringUtils.join(data, ",")); + logger.info("Got {}", results); assertEquals(data.length, results.size()); for (int i = 0; i < data.length; i++) { assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); @@ -174,7 +203,7 @@ private void verifyResults(String[] data) throws IOException { try { driver.getResults(results); } catch (CommandNeedRetryException e) { - TestReplicationScenariosAcrossInstances.LOG.warn(e.getMessage(), e); + logger.warn(e.getMessage(), e); throw new RuntimeException(e); } return results; @@ -182,12 +211,45 @@ private void verifyResults(String[] data) throws IOException { private void printOutput() throws IOException { for (String s : getOutput()) { - TestReplicationScenariosAcrossInstances.LOG.info(s); + logger.info(s); } } - public ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip){ - return new ReplicationV1CompatRule(client,hconf,testsToSkip); + ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip) { + return new ReplicationV1CompatRule(client, hiveConf, testsToSkip); + + } + + @Override + public void close() throws IOException { + if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) { + miniDFSCluster.shutdown(); + } + } + + List copyToHDFS(List localUris) throws IOException, SemanticException { + DistributedFileSystem fs = miniDFSCluster.getFileSystem(); + Path destinationBasePath = new Path("/", String.valueOf(System.nanoTime())); + mkDir(fs, destinationBasePath.toString()); + localUris.forEach(uri -> { + Path localPath = new Path(uri); + try { + FileSystem localFs = localPath.getFileSystem(hiveConf); + boolean success = FileUtils + .copy(localFs, localPath, fs, destinationBasePath, false, false, hiveConf); + if (!success) { + fail("FileUtils could not copy local uri " + localPath.toString() + " to hdfs"); + } + } catch (IOException e) { + String message = "error on copy of local uri " + localPath.toString() + " to hdfs"; + logger.error(message, e); + fail(message + ExceptionUtils.getFullStackTrace(e)); + } + }); + + List fileStatuses = + Arrays.asList(fs.globStatus(new Path(destinationBasePath, "*"))); + return fileStatuses.stream().map(FileStatus::getPath).collect(Collectors.toList()); } static class Tuple { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 52bfb2698f..e886540774 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6377,7 +6377,20 @@ public void drop_function(String dbName, String funcName) if (func == null) { throw new NoSuchObjectException("Function " + funcName + " does not exist"); } + // if copy of jar to change management fails we fail the metastore transaction, since the + // user might delete the jars on HDFS externally after dropping the function, hence having + // a copy is required to allow incremental replication to work correctly. + if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) { + for (ResourceUri uri : func.getResourceUris()) { + if (uri.getUri().toLowerCase().startsWith("hdfs:")) { + wh.addToChangeManagement(new Path(uri.getUri())); + } + } + } + // if the operation on metastore fails, we don't do anything in change management, but fail + // the metastore transaction, as having a copy of the jar in change management is not going + // to cause any problem, the cleaner thread will remove this when this jar expires. ms.dropFunction(dbName, funcName); if (transactionalListeners.size() > 0) { transactionalListenerResponses = @@ -6385,7 +6398,6 @@ public void drop_function(String dbName, String funcName) EventType.DROP_FUNCTION, new DropFunctionEvent(func, true, this)); } - success = ms.commitTransaction(); } finally { if (!success) { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 51e4627a9d..c955470f91 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -100,6 +101,34 @@ public boolean accept(Path p){ } }; + void addFile(Path path) throws MetaException { + if (!enabled) { + return; + } + try { + if (fs.isDirectory(path)) { + throw new IllegalArgumentException(path + " cannot be a directory"); + } + Path cmPath = getCMPath(hiveConf, getChksumString(path, fs)); + boolean copySuccessful = FileUtils + .copy(path.getFileSystem(hiveConf), path, cmPath.getFileSystem(hiveConf), cmPath, false, + false, hiveConf); + if (!copySuccessful) { + LOG.debug("A file with the same content of " + path.toString() + " already exists, ignore"); + } else { + fs.setOwner(cmPath, msUser, msGroup); + try { + fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for " + path.toString()); + } + } + } catch (Exception exception) { + throw new MetaException(StringUtils.stringifyException(exception)); + } + } + + /*** * Move a path into cmroot. If the path is a directory (of a partition, or table if nonpartitioned), * recursively move files inside directory to cmroot. Note the table must be managed table @@ -122,7 +151,7 @@ public int recycle(Path path, boolean ifPurge) throws MetaException { count += recycle(file.getPath(), ifPurge); } } else { - Path cmPath = getCMPath(path, hiveConf, getChksumString(path, fs)); + Path cmPath = getCMPath(hiveConf, getChksumString(path, fs)); if (LOG.isDebugEnabled()) { LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); @@ -198,16 +227,15 @@ static public void setCmRoot(Path cmRoot) { * Convert a path of file inside a partition or table (if non-partitioned) * to a deterministic location of cmroot. So user can retrieve the file back * with the original location plus checksum. - * @param path original path inside partition or table * @param conf - * @param chksum checksum of the file, can be retrieved by {@link getCksumString} + * @param checkSum checksum of the file, can be retrieved by {@link getCksumString} * @return * @throws IOException * @throws MetaException */ - static public Path getCMPath(Path path, Configuration conf, String chksum) + static Path getCMPath(Configuration conf, String checkSum) throws IOException, MetaException { - String newFileName = chksum; + String newFileName = checkSum; int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); @@ -215,9 +243,7 @@ static public Path getCMPath(Path path, Configuration conf, String chksum) newFileName = newFileName.substring(0, maxLength-1); } - Path cmPath = new Path(cmroot, newFileName); - - return cmPath; + return new Path(cmroot, newFileName); } /*** @@ -238,14 +264,14 @@ static public FileStatus getFileStatus(Path src, String chksumString, } if (!srcFs.exists(src)) { - return srcFs.getFileStatus(getCMPath(src, conf, chksumString)); + return srcFs.getFileStatus(getCMPath(conf, chksumString)); } String currentChksumString = getChksumString(src, srcFs); if (currentChksumString == null || chksumString.equals(currentChksumString)) { return srcFs.getFileStatus(src); } else { - return srcFs.getFileStatus(getCMPath(src, conf, chksumString)); + return srcFs.getFileStatus(getCMPath(conf, chksumString)); } } catch (IOException e) { throw new MetaException(StringUtils.stringifyException(e)); @@ -283,6 +309,11 @@ static public String encodeFileUri(String fileUriStr, String fileChecksum) { return result; } + public static boolean isCMFileUri(Path fromPath, FileSystem srcFs) { + String[] result = getFileWithChksumFromURI(fromPath.toString()); + return result[1] != null; + } + /** * Thread to clear old files of cmroot recursively */ diff --git metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java index 8134ab2cff..053a0de645 100755 --- metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -206,6 +206,10 @@ public boolean renameDir(Path sourcePath, Path destPath) throws MetaException { return false; } + void addToChangeManagement(Path file) throws MetaException { + cm.addFile(file); + } + public boolean deleteDir(Path f, boolean recursive) throws MetaException { return deleteDir(f, recursive, false); } diff --git pom.xml pom.xml index e3ff84f55f..70518d12c0 100644 --- pom.xml +++ pom.xml @@ -143,7 +143,7 @@ 1.3.166 2.8.0 ${basedir}/${hive.path.to.root}/testutils/hadoop - 1.1 + 1.3 1.1.1 3.3.0 @@ -178,7 +178,7 @@ 2.6.2 2.3 1.3.3 - 1.9.5 + 1.10.19 2.0.0-M5 4.0.29.Final 1.8.1 diff --git ql/pom.xml ql/pom.xml index 40a216bc8f..9c176958a2 100644 --- ql/pom.xml +++ ql/pom.xml @@ -29,6 +29,7 @@ .. + 1.6.6 @@ -735,6 +736,18 @@ ${hamcrest.version} test + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito + ${powermock.version} + test + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index f277284ec6..71db33289f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -75,7 +75,27 @@ protected int execute(DriverContext driverContext) { FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - List srcFiles = new ArrayList(); + // This should only be true for copy tasks created from functions, otherwise there should never + // be a CM uri in the from path. + if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) { + String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString()); + Path sourcePath = ReplChangeManager + .getFileStatus(new Path(result[0]), result[1], conf) + .getPath(); + if (FileUtils.copy( + sourcePath.getFileSystem(conf), sourcePath, + dstFs, toPath + , false, false, conf + )) { + return 0; + } else { + console.printError("Failed to copy: '" + fromPath.toString() + "to: '" + toPath.toString() + + "'"); + return 1; + } + } + + List srcFiles = new ArrayList<>(); FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length)); if (! rwork.getReadListFromInput()){ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 1e6b192beb..adcdc12f75 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -47,21 +47,20 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler; import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; -import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; -import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; -import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +76,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; @@ -377,11 +377,13 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticE continue; } - Path functionMetadataRoot = - new Path(new Path(functionsRoot, functionName), FUNCTION_METADATA_DIR_NAME); + Path functionRoot = new Path(functionsRoot, functionName); + Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME); try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) { - new FunctionSerializer(tuple.object).writeTo(jsonWriter, tuple.replicationSpec); + FunctionSerializer serializer = + new FunctionSerializer(tuple.object, conf); + serializer.writeTo(jsonWriter, tuple.replicationSpec); } REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName); } @@ -839,31 +841,23 @@ private void analyzeFunctionLoad(String dbName, FileStatus functionDir, .getValidatedURI(conf, stripQuotes(functionDir.getPath().toUri().toString())); Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); - FileSystem fs = FileSystem.get(fromURI, conf); - inputs.add(toReadEntity(fromPath, conf)); - try { - MetaData metaData = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); - ReplicationSpec replicationSpec = metaData.getReplicationSpec(); - if (replicationSpec.isNoop()) { - // nothing to do here, silently return. - return; - } - CreateFunctionDesc desc = new CreateFunctionDesc( - dbName + "." + metaData.function.getFunctionName(), - false, - metaData.function.getClassName(), - metaData.function.getResourceUris() + CreateFunctionHandler handler = new CreateFunctionHandler(); + List> tasksList = handler.handle( + new MessageHandler.Context( + dbName, null, fromPath.toString(), createDbTask, null, conf, db, + null, LOG) ); - Task currentTask = TaskFactory.get(new FunctionWork(desc), conf); - if (createDbTask != null) { - createDbTask.addDependentTask(currentTask); + tasksList.forEach(task -> { + createDbTask.addDependentTask(task); LOG.debug("Added {}:{} as a precursor of {}:{}", - createDbTask.getClass(), createDbTask.getId(), currentTask.getClass(), - currentTask.getId()); - } - } catch (IOException e) { + createDbTask.getClass(), createDbTask.getId(), task.getClass(), + task.getId()); + + }); + inputs.addAll(handler.readEntities()); + } catch (Exception e) { throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/PathBuilder.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/PathBuilder.java new file mode 100644 index 0000000000..05b98216ad --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/PathBuilder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; + +/** + * Path builder to stitch together paths with different components might be useful as utils across + * replication semantic analyzer atleast. + */ +public class PathBuilder { + private ArrayList descendants = new ArrayList<>(); + private String basePath; + + public PathBuilder(String basePath) { + this.basePath = basePath; + } + + public PathBuilder addDescendant(String path) { + descendants.add(path); + return this; + } + + public Path build() { + Path result = new Path(this.basePath); + for (String descendant : descendants) { + result = new Path(result, descendant); + } + return result; + } + + public static Path fullyQualifiedHDFSUri(Path input, FileSystem hdfsFileSystem) + throws SemanticException { + URI uri = input.toUri(); + String scheme = hdfsFileSystem.getScheme(); + String authority = hdfsFileSystem.getUri().getAuthority(); + String path = uri.getPath(); + try { + return new Path(new URI(scheme, authority, path, null, null)); + } catch (URISyntaxException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java index d04c7b5a7c..ee3432ccde 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateFunctionHandler.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + package org.apache.hadoop.hive.ql.parse.repl.dump.events; import org.apache.hadoop.fs.FileSystem; @@ -23,11 +23,10 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; -import org.apache.hadoop.hive.ql.parse.repl.DumpType; - class CreateFunctionHandler extends AbstractEventHandler { CreateFunctionHandler(NotificationEvent event) { super(event); @@ -42,7 +41,7 @@ public void handle(Context withinContext) throws Exception { FileSystem fileSystem = metadataPath.getFileSystem(withinContext.hiveConf); try (JsonWriter jsonWriter = new JsonWriter(fileSystem, metadataPath)) { - new FunctionSerializer(createFunctionMessage.getFunctionObj()) + new FunctionSerializer(createFunctionMessage.getFunctionObj(), withinContext.hiveConf) .writeTo(jsonWriter, withinContext.replicationSpec); } withinContext.createDmd(this).write(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java index 5dc702386e..6c2a4021de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java @@ -17,21 +17,31 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TJSONProtocol; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; public class FunctionSerializer implements JsonWriter.Serializer { - public static final String FIELD_NAME="function"; + public static final String FIELD_NAME = "function"; private Function function; + private HiveConf hiveConf; - public FunctionSerializer(Function function) { + public FunctionSerializer(Function function, HiveConf hiveConf) { + this.hiveConf = hiveConf; this.function = function; } @@ -39,9 +49,33 @@ public FunctionSerializer(Function function) { public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); + List resourceUris = new ArrayList<>(); + for (ResourceUri uri : function.getResourceUris()) { + Path inputPath = new Path(uri.getUri()); + if ("hdfs".equals(inputPath.toUri().getScheme())) { + FileSystem fileSystem = inputPath.getFileSystem(hiveConf); + Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem); + String checkSum = ReplChangeManager.getChksumString(qualifiedUri, fileSystem); + String newFileUri = ReplChangeManager.encodeFileUri(qualifiedUri.toString(), checkSum); + resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri)); + } else { + resourceUris.add(uri); + } + } + Function copyObj = new Function(this.function); + if (!resourceUris.isEmpty()) { + assert resourceUris.size() == this.function.getResourceUris().size(); + copyObj.setResourceUris(resourceUris); + } + try { + //This is required otherwise correct work object on repl load wont be created. + writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.REPL_SCOPE.toString(), + "all"); + writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.CURR_STATE_ID.toString(), + additionalPropertiesProvider.getCurrentReplicationState()); writer.jsonGenerator - .writeStringField(FIELD_NAME, serializer.toString(function, UTF_8)); + .writeStringField(FIELD_NAME, serializer.toString(copyObj, UTF_8)); } catch (TException e) { throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METASTORE.getMsg(), e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java index fc02dfd528..17d8ab2ef7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetaData.java @@ -37,7 +37,7 @@ public MetaData() { this(null, null, null, new ReplicationSpec(), null); } - MetaData(Database db, Table table, Iterable partitions, + public MetaData(Database db, Table table, Iterable partitions, ReplicationSpec replicationSpec, Function function) { this.db = db; this.table = table; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 8b6179b072..452f506609 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -18,51 +18,177 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FunctionUtils; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.FunctionWork; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toReadEntity; + public class CreateFunctionHandler extends AbstractMessageHandler { @Override public List> handle(Context context) throws SemanticException { try { - FileSystem fs = FileSystem.get(new Path(context.location).toUri(), context.hiveConf); - MetaData metadata; + FunctionDescBuilder builder = new FunctionDescBuilder(context); + CreateFunctionDesc descToLoad = builder.build(); + context.log.debug("Loading function desc : {}", descToLoad.toString()); + Task createTask = TaskFactory.get( + new FunctionWork(descToLoad), context.hiveConf + ); + context.log.debug("Added create function task : {}:{},{}", createTask.getId(), + descToLoad.getFunctionName(), descToLoad.getClassName()); + // This null check is specifically done as the same class is used to handle both incremental and + // bootstrap replication scenarios for create function. When doing bootstrap we do not have + // event id for this event but rather when bootstrap started and hence we pass in null dmd for + // bootstrap.There should be a better way to do this but might required a lot of changes across + // different handlers, unless this is a common pattern that is seen, leaving this here. + if (context.dmd != null) { + databasesUpdated.put(builder.destinationDbName, context.dmd.getEventTo()); + } + readEntitySet.add(toReadEntity(new Path(context.location), context.hiveConf)); + if (builder.replCopyTasks.isEmpty()) { + // reply copy only happens for jars on hdfs not otherwise. + return Collections.singletonList(createTask); + } else { + /** + * This is to understand how task dependencies work. + * All root tasks are executed in parallel. For bootstrap replication there should be only one root task of creating db. Incremental can be multiple ( have to verify ). + * Task has children, which are put in queue for execution after the parent has finished execution. + * One -to- One dependency can be satisfied by adding children to a given task, do this recursively where the relation holds. + * for many to one , create a barrier task that is the child of every item in 'many' dependencies, make the 'one' dependency as child of barrier task. + * add the 'many' to parent/root tasks. The execution environment will make sure that the child barrier task will not get executed unless all parents of the barrier task are complete, + * which should only happen when the last task is finished, at which point the child of the barrier task is picked up. + */ + Task barrierTask = + TaskFactory.get(new DependencyCollectionWork(), context.hiveConf); + builder.replCopyTasks.forEach(t -> t.addDependentTask(barrierTask)); + barrierTask.addDependentTask(createTask); + return builder.replCopyTasks; + } + } catch (Exception e) { + throw (e instanceof SemanticException) + ? (SemanticException) e + : new SemanticException("Error reading message members", e); + } + } + + private static class FunctionDescBuilder { + private final Context context; + private final MetaData metadata; + private final String destinationDbName; + private final List> replCopyTasks = new ArrayList<>(); + + private FunctionDescBuilder(Context context) throws SemanticException { + this.context = context; try { + FileSystem fs = FileSystem.get(new Path(context.location).toUri(), context.hiveConf); metadata = EximUtil.readMetaData(fs, new Path(context.location, EximUtil.METADATA_NAME)); } catch (IOException e) { throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); } + destinationDbName = context.isDbNameEmpty() ? metadata.function.getDbName() : context.dbName; + } - String dbName = context.isDbNameEmpty() ? metadata.function.getDbName() : context.dbName; - CreateFunctionDesc desc = new CreateFunctionDesc( - FunctionUtils.qualifyFunctionName(metadata.function.getFunctionName(), dbName), false, - metadata.function.getClassName(), metadata.function.getResourceUris() + private CreateFunctionDesc build() { + replCopyTasks.clear(); + PrimaryToReplicaResourceFunction conversionFunction = + new PrimaryToReplicaResourceFunction(context, metadata, destinationDbName); + // We explicitly create immutable lists here as it forces the guava lib to run the transformations + // and not do them lazily. The reason being the function class used for transformations additionally + // also creates the corresponding replCopyTasks, which cannot be evaluated lazily. since the query + // plan needs to be complete before we execute and not modify it while execution in the driver. + List transformedUris = ImmutableList.copyOf( + Lists.transform(metadata.function.getResourceUris(), conversionFunction) + ); + replCopyTasks.addAll(conversionFunction.replCopyTasks); + String fullQualifiedFunctionName = FunctionUtils.qualifyFunctionName( + metadata.function.getFunctionName(), destinationDbName ); + return new CreateFunctionDesc( + fullQualifiedFunctionName, false, metadata.function.getClassName(), transformedUris + ); + } + } - Task task = TaskFactory.get(new FunctionWork(desc), context.hiveConf); - context.log.debug("Added create function task : {}:{},{}", task.getId(), - metadata.function.getFunctionName(), metadata.function.getClassName()); - databasesUpdated.put(dbName, context.dmd.getEventTo()); - return Collections.singletonList(task); - } catch (Exception e) { - throw (e instanceof SemanticException) - ? (SemanticException) e - : new SemanticException("Error reading message members", e); + static class PrimaryToReplicaResourceFunction + implements Function { + private final Context context; + private final MetaData metadata; + private final List> replCopyTasks = new ArrayList<>(); + private final String functionsRootDir; + private String destinationDbName; + + PrimaryToReplicaResourceFunction(Context context, MetaData metadata, + String destinationDbName) { + this.context = context; + this.metadata = metadata; + this.destinationDbName = destinationDbName; + this.functionsRootDir = context.hiveConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR); + } + + @Override + public ResourceUri apply(ResourceUri resourceUri) { + try { + return resourceUri.getUri().toLowerCase().startsWith("hdfs:") + ? destinationResourceUri(resourceUri) + : resourceUri; + } catch (IOException | SemanticException e) { + throw new RuntimeException(e); + } + } + + /** + * the destination also includes the current timestamp to randomise the placement of the jar at a given location for a function . + * this is done to allow the CREATE / DROP / CREATE of the same function with same name and jar's but updated + * binaries across the two creates. + */ + ResourceUri destinationResourceUri(ResourceUri resourceUri) + throws IOException, SemanticException { + String sourceUri = resourceUri.getUri(); + String[] split = sourceUri.split(Path.SEPARATOR); + PathBuilder pathBuilder = new PathBuilder(functionsRootDir); + Path qualifiedDestinationPath = PathBuilder.fullyQualifiedHDFSUri( + pathBuilder + .addDescendant(destinationDbName.toLowerCase()) + .addDescendant(metadata.function.getFunctionName().toLowerCase()) + .addDescendant(String.valueOf(System.nanoTime())) + .addDescendant(ReplChangeManager.getFileWithChksumFromURI(split[split.length - 1])[0]) + .build(), + FileSystem.get(context.hiveConf) + ); + + Task copyTask = ReplCopyTask.getLoadCopyTask( + metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath, + context.hiveConf + ); + replCopyTasks.add(copyTask); + ResourceUri destinationUri = + new ResourceUri(resourceUri.getResourceType(), qualifiedDestinationPath.toString()); + context.log.debug("copy source uri : {} to destination uri: {}", sourceUri, destinationUri); + return destinationUri; } } } \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/PrimaryToReplicaResourceFunctionTest.java ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/PrimaryToReplicaResourceFunctionTest.java new file mode 100644 index 0000000000..7a87701319 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/PrimaryToReplicaResourceFunctionTest.java @@ -0,0 +1,87 @@ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.ResourceType; +import org.apache.hadoop.hive.metastore.api.ResourceUri; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import static org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler.PrimaryToReplicaResourceFunction; +import static org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler.Context; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.mockito.Matchers.any; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ PrimaryToReplicaResourceFunction.class, FileSystem.class, ReplCopyTask.class, + System.class }) +public class PrimaryToReplicaResourceFunctionTest { + + private PrimaryToReplicaResourceFunction function; + @Mock + private HiveConf hiveConf; + @Mock + + private Function functionObj; + @Mock + private FileSystem mockFs; + private static Logger logger = + LoggerFactory.getLogger(PrimaryToReplicaResourceFunctionTest.class); + + @Before + public void setup() { + MetaData metadata = new MetaData(null, null, null, null, functionObj); + Context context = + new Context("primaryDb", null, null, null, null, hiveConf, null, null, logger); + when(hiveConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR)) + .thenReturn("/someBasePath/withADir/"); + function = new PrimaryToReplicaResourceFunction(context, metadata, "replicaDbName"); + } + + @Test + public void createDestinationPath() throws IOException, SemanticException, URISyntaxException { + mockStatic(FileSystem.class); + when(FileSystem.get(any(Configuration.class))).thenReturn(mockFs); + when(mockFs.getScheme()).thenReturn("hdfs"); + when(mockFs.getUri()).thenReturn(new URI("hdfs", "somehost:9000", null, null, null)); + mockStatic(System.class); + when(System.currentTimeMillis()).thenReturn(Long.MAX_VALUE); + when(functionObj.getFunctionName()).thenReturn("someFunctionName"); + mockStatic(ReplCopyTask.class); + Task mock = mock(Task.class); + when(ReplCopyTask.getLoadCopyTask(any(ReplicationSpec.class), any(Path.class), any(Path.class), + any(HiveConf.class))).thenReturn(mock); + + ResourceUri resourceUri = function.destinationResourceUri(new ResourceUri(ResourceType.JAR, + "hdfs://localhost:9000/user/someplace/ab.jar#e094828883")); + + assertThat(resourceUri.getUri(), + is(equalTo( + "hdfs://somehost:9000/someBasePath/withADir/replicaDbName/somefunctionname/" + String + .valueOf(Long.MAX_VALUE) + "/ab.jar"))); + } +} \ No newline at end of file