diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java index b5cd1ef..95201ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java @@ -106,7 +106,16 @@ public class ExecDriver extends Task implements Serializable { SessionState.ResourceType t) { // fill in local files to be added to the task environment SessionState ss = SessionState.get(); - Set files = (ss == null) ? null : ss.list_resource(t, null); + if (ss == null) { + return ""; + } + Set files; + // Handle jar resources separately. + if (t == SessionState.ResourceType.JAR) { + files = ss.listLocalJarResources(); + } else { + files = ss.list_resource(t, null); + } if (files != null) { List realFiles = new ArrayList(files.size()); for (String one : files) { diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index a36b06e..afaf4df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -20,10 +20,13 @@ package org.apache.hadoop.hive.ql.session; import java.io.IOException; import java.io.InputStream; +import java.io.File; import java.io.PrintStream; import java.net.URL; import java.util.Calendar; import java.util.GregorianCalendar; +import java.util.EnumMap; +import java.util.Map; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -33,6 +36,10 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; @@ -118,6 +125,11 @@ public class SessionState { this.conf = conf; isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); ls = new LineageState(); + + // Initialize resourceMap + for (ResourceType t : ResourceType.values()) { + this.resourceMap.put(t, new HashMap()); + } } public void setCmd(String cmdString) { @@ -291,7 +303,7 @@ public class SessionState { return _console; } - public static String validateFile(Set curFiles, String newFile) { + public static String validateFile(String newFile) { SessionState ss = SessionState.get(); LogHelper console = getConsole(); Configuration conf = (ss == null) ? new Configuration() : ss.getConf(); @@ -327,14 +339,14 @@ public class SessionState { } } - public static boolean unregisterJar(String jarsToUnregister) { + public static boolean unregisterJar(String jarToUnregister) { LogHelper console = getConsole(); try { - Utilities.removeFromClassPath(StringUtils.split(jarsToUnregister, ",")); - console.printInfo("Deleted " + jarsToUnregister + " from class path"); + Utilities.removeFromClassPath(new String[] {jarToUnregister}); + console.printInfo("Deleted " + jarToUnregister + " from class path"); return true; } catch (Exception e) { - console.printError("Unable to unregister " + jarsToUnregister + console.printError("Unable to unregister " + jarToUnregister + "\nException: " + e.getMessage(), "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return false; @@ -342,59 +354,115 @@ public class SessionState { } /** - * ResourceHook. - * + * Represents a resource that may be loaded and unloaded. */ public static interface ResourceHook { - String preHook(Set cur, String s); + /** Returns true on success. postHook() should not be called + * unless preHook() was succesful. + */ + boolean preHook(); + boolean postHook(); + } + + /** Represents a file or an archive. */ + static class FileResourceHook implements ResourceHook { + final private String file; + + FileResourceHook(String file) { + this.file = file; + } + + public boolean preHook() { + return null != validateFile(file); + } + + public boolean postHook() { + return true; + } - boolean postHook(Set cur, String s); } /** - * ResourceType. - * + * Jar resources must be loaded into the class loader, + * which implies special care. If the resource is remote, + * the resource must be copied locally first. */ - public static enum ResourceType { - FILE(new ResourceHook() { - public String preHook(Set cur, String s) { - return validateFile(cur, s); + static class JarResourceHook implements ResourceHook { + /** Originalize resource. */ + final private String file; + + /** Local path to file, if necessary. */ + private File localFile = null; + + JarResourceHook(String file) { + this.file = file; } - public boolean postHook(Set cur, String s) { + public boolean preHook() { + String newJar = validateFile(file); + if (newJar != null) { + HiveConf conf = SessionState.get().getConf(); + Path p = new Path(newJar); + if (p.toUri().getScheme() != "file") { + LogHelper console = getConsole(); + // The jar is not local; copy it locally first. + File scratchDir = new File(conf.getVar(HiveConf.ConfVars.SCRATCHDIR)); + try { + localFile = File.createTempFile(p.getName(), ".jar", scratchDir); + console.printInfo("Copying " + newJar + " to " + localFile); + FileSystem.get(p.toUri(), conf).copyToLocalFile(p, new Path(localFile.toURI().toString())); + } catch (IOException e) { + console.printError("Failed to load jar resource: " + e.toString()); + if (localFile != null) { + localFile.delete(); + } + return false; + } + newJar = localFile.getPath(); + } + if (!registerJar(newJar)) { + if (localFile != null) { + localFile.delete(); + } + return false; + } return true; } - }), + return false; + } - JAR(new ResourceHook() { - public String preHook(Set cur, String s) { - String newJar = validateFile(cur, s); - if (newJar != null) { - return (registerJar(newJar) ? newJar : null); + public boolean postHook() { + if (localFile != null) { + boolean ret = unregisterJar(localFile.toString()); + localFile.delete(); + return ret; } else { - return null; + return unregisterJar(file); } } - public boolean postHook(Set cur, String s) { - return unregisterJar(s); + public String getLocalFile() { + return localFile.getPath(); } - }), - - ARCHIVE(new ResourceHook() { - public String preHook(Set cur, String s) { - return validateFile(cur, s); } - public boolean postHook(Set cur, String s) { - return true; + /** Types of resources. */ + public static enum ResourceType { + FILE, + JAR, + ARCHIVE; + + /** Factory method for constructing resource hooks. */ + public ResourceHook makeHook(String file) { + switch(this) { + case FILE: + case ARCHIVE: + return new FileResourceHook(file); + case JAR: + return new JarResourceHook(file); + default: + throw new IllegalArgumentException("Unrecognized resource type: " + this); } - }); - - public ResourceHook hook; - - ResourceType(ResourceHook hook) { - this.hook = hook; } }; @@ -421,41 +489,36 @@ public class SessionState { return null; } - private final HashMap> resource_map = - new HashMap>(); + private final Map> resourceMap = + new EnumMap>(ResourceType.class); public void add_resource(ResourceType t, String value) { - if (resource_map.get(t) == null) { - resource_map.put(t, new HashSet()); - } - - String fnlVal = value; - if (t.hook != null) { - fnlVal = t.hook.preHook(resource_map.get(t), value); - if (fnlVal == null) { + if (resourceMap.get(t).containsKey(value)) { + getConsole().printError("Resource " + value + " already added."); return; } + ResourceHook hook = t.makeHook(value); + if (hook.preHook()) { + resourceMap.get(t).put(value, hook); } - resource_map.get(t).add(fnlVal); } public boolean delete_resource(ResourceType t, String value) { - if (resource_map.get(t) == null) { - return false; - } - if (t.hook != null) { - if (!t.hook.postHook(resource_map.get(t), value)) { - return false; + ResourceHook h = resourceMap.get(t).get(value); + if (h != null) { + if (h.postHook()) { + resourceMap.get(t).remove(value); + return true; } } - return (resource_map.get(t).remove(value)); + return false; } public Set list_resource(ResourceType t, List filter) { - if (resource_map.get(t) == null) { + if(resourceMap.get(t) == null) { return null; } - Set orig = resource_map.get(t); + Set orig = resourceMap.get(t).keySet(); if (filter == null) { return orig; } else { @@ -470,11 +533,12 @@ public class SessionState { } public void delete_resource(ResourceType t) { - if (resource_map.get(t) != null) { - for (String value : resource_map.get(t)) { + for (String value : resourceMap.get(t).keySet()) { delete_resource(t, value); } - resource_map.remove(t); + if (!resourceMap.get(t).isEmpty()) { + getConsole().printInfo("Force-clearing resource map for resource type: " + t); + resourceMap.put(t, new HashMap()); } } @@ -485,4 +549,21 @@ public class SessionState { public void setCommandType(String commandType) { this.commandType = commandType; } + + /** + * JAR resources are handled a bit separately, because + * the "-libjars" argument requires local paths. Though + * the canonical name (as returned by list_resource()) + * of these is still the path enterred by the user, + * for internal use, we return the local copies we have + * created. + */ + public Set listLocalJarResources() { + Set ret = new HashSet(); + for (ResourceHook hook : resourceMap.get(ResourceType.JAR).values()) { + JarResourceHook h = (JarResourceHook) hook; + ret.add(h.getLocalFile()); + } + return ret; + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/session/TestAddJarFromHDFS.java ql/src/test/org/apache/hadoop/hive/ql/session/TestAddJarFromHDFS.java new file mode 100644 index 0000000..c19f346 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/session/TestAddJarFromHDFS.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.session; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.cli.CliDriver; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim; + +/** + * This tests that "add jar" can reference files on HDFS + * file systems. The essential check is that after we load + * the file onto HDFS, we have that resource available in our + * current classpath. + */ +public class TestAddJarFromHDFS extends TestCase { + + public void testAddJarFromHDFS() throws IOException { + // Create a conf + HiveConf conf = new HiveConf(Driver.class); + + // Make sure scratch dir exists + File scratchDir = new File(conf.getVar(HiveConf.ConfVars.SCRATCHDIR)); + if (!scratchDir.exists()) { + scratchDir.mkdirs(); + } + + // Create a DFS + MiniDFSShim dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 1, true, null); + FileSystem fs = dfs.getFileSystem(); + + File tmpJarFile = createSimpleJarFile(); + + // Copy it over to the DFS + fs.copyFromLocalFile(new Path(tmpJarFile.toString()), new Path("/addJarFromHdfs.jar")); + + // Set up our session + SessionState ss = new SessionState(conf); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ss.err = new PrintStream(baos); + SessionState.start(ss); + CliDriver cliDriver = new CliDriver(); + + assertNull("Resource should not be here yet", getTestResource()); + + // Execute the "add jar" command + String jarURI = fs.getUri().toString() + "/addJarFromHdfs.jar"; + int ret = cliDriver.processCmd("ADD JAR " + jarURI); + assertEquals("ADD JAR failed", 0, ret); + // Check that logs contain the right things... + assertTrue(baos.toString("UTF-8").contains("Copying hdfs://")); + assertTrue(baos.toString("UTF-8").contains("Added")); + assertTrue(baos.toString("UTF-8").contains("to class path")); + + // Check that our class path actually contains our new resource + assertNotNull("Resource should have been loaded", getTestResource()); + + // Remove the resource + ret = cliDriver.processCmd("DELETE JAR " + jarURI); + assertEquals("DELETE JAR failed", 0, ret); + + assertNull("Resource should have been unloaded", getTestResource()); + + // Clean up + dfs.shutdown(); + } + + private URL getTestResource() { + URL resource = Thread.currentThread() + .getContextClassLoader().getResource("testAddJarFromHDFS.txt"); + return resource; + } + + /** + * Creates a jar file with a single entry, "testAddJarFromHDFS.txt". + */ + private File createSimpleJarFile() throws IOException, FileNotFoundException, + UnsupportedEncodingException { + // Create a jar file + File tmpJarFile = File.createTempFile("addJarFromHdfs", ".jar"); + JarOutputStream jis = new JarOutputStream(new FileOutputStream(tmpJarFile)); + JarEntry jarEntry = new JarEntry("testAddJarFromHDFS.txt"); + jis.putNextEntry(jarEntry); + jis.write("Just some text".getBytes("UTF-8")); + jis.closeEntry(); + jis.close(); + return tmpJarFile; + } + +}