diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index cff0056..d4f6865 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; +import org.apache.hadoop.hive.druid.io.DruidRecordWriter; import org.apache.hadoop.hive.druid.serde.DruidSerDe; import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; @@ -520,7 +521,11 @@ public void configureTableJobProperties(TableDesc tableDesc, Map @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { - + try { + DruidStorageHandlerUtils.addDependencyJars(jobConf, DruidRecordWriter.class); + } catch (IOException e) { + Throwables.propagate(e); + } } @Override diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 52e7e8d..919ad4f 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -47,15 +47,19 @@ import io.druid.segment.column.ColumnConfig; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.StringUtils; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Period; @@ -73,23 +77,35 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.Reader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.URI; import java.net.URL; +import java.net.URLDecoder; import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +import static org.apache.hadoop.hive.ql.exec.Utilities.jarFinderGetJar; /** * Utils class for Druid storage handler. */ public final class DruidStorageHandlerUtils { + private static Log LOG = LogFactory.getLog(DruidStorageHandlerUtils.class); private static final String SMILE_CONTENT_TYPE = "application/x-jackson-smile"; /** @@ -431,4 +447,176 @@ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, public interface DataPusher { long push() throws IOException; } + + // Thanks, HBase Storage handler + public static void addDependencyJars(Configuration conf, Class... classes) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Set jars = new HashSet(); + // Add jars that are already in the tmpjars variable + jars.addAll(conf.getStringCollection("tmpjars")); + + // add jars as we find them to a map of contents jar name so that we can + // avoid + // creating new jars for classes that have already been packaged. + Map packagedClasses = new HashMap(); + + // Add jars containing the specified classes + for (Class clazz : classes) { + if (clazz == null) + continue; + + Path path = findOrCreateJar(clazz, localFs, packagedClasses); + if (path == null) { + LOG.warn("Could not find jar for class " + clazz + " in order to ship it to the cluster."); + continue; + } + if (!localFs.exists(path)) { + LOG.warn("Could not validate jar file " + path + " for class " + clazz); + continue; + } + jars.add(path.toString()); + } + if (jars.isEmpty()) + return; + + conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); + } + + /** + * If org.apache.hadoop.util.JarFinder is available (0.23+ hadoop), finds the Jar for a class or + * creates it if it doesn't exist. If the class is in a directory in the classpath, it creates a + * Jar on the fly with the contents of the directory and returns the path to that Jar. If a Jar is + * created, it is created in the system temporary directory. Otherwise, returns an existing jar + * that contains a class of the same name. Maintains a mapping from jar contents to the tmp jar + * created. + * + * @param my_class + * the class to find. + * @param fs + * the FileSystem with which to qualify the returned path. + * @param packagedClasses + * a map of class name to path. + * @return a jar file that contains the class. + * @throws IOException + */ + @SuppressWarnings("deprecation") + private static Path findOrCreateJar(Class my_class, FileSystem fs, + Map packagedClasses) throws IOException { + // attempt to locate an existing jar for the class. + String jar = findContainingJar(my_class, packagedClasses); + if (null == jar || jar.isEmpty()) { + jar = getJar(my_class); + updateMap(jar, packagedClasses); + } + + if (null == jar || jar.isEmpty()) { + return null; + } + + LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); + return new Path(jar).makeQualified(fs); + } + + /** + * Add entries to packagedClasses corresponding to class files contained in + * jar. + * + * @param jar + * The jar who's content to list. + * @param packagedClasses + * map[class -> jar] + */ + private static void updateMap(String jar, Map packagedClasses) throws IOException { + if (null == jar || jar.isEmpty()) { + return; + } + ZipFile zip = null; + try { + zip = new ZipFile(jar); + for (Enumeration iter = zip.entries(); iter.hasMoreElements();) { + ZipEntry entry = iter.nextElement(); + if (entry.getName().endsWith("class")) { + packagedClasses.put(entry.getName(), jar); + } + } + } finally { + if (null != zip) + zip.close(); + } + } + + /** + * Invoke 'getJar' on a JarFinder implementation. Useful for some job configuration contexts + * (HBASE-8140) and also for testing on MRv2. First check if we have HADOOP-9426. Lacking that, + * fall back to the backport. + * + * @param my_class + * the class to find. + * @return a jar file that contains the class, or null. + */ + private static String getJar(Class my_class) { + String ret = null; + String hadoopJarFinder = "org.apache.hadoop.util.JarFinder"; + Class jarFinder = null; + try { + LOG.debug("Looking for " + hadoopJarFinder + "."); + jarFinder = JavaUtils.loadClass(hadoopJarFinder); + LOG.debug(hadoopJarFinder + " found."); + Method getJar = jarFinder.getMethod("getJar", Class.class); + ret = (String) getJar.invoke(null, my_class); + } catch (ClassNotFoundException e) { + LOG.debug("Using backported JarFinder."); + ret = jarFinderGetJar(my_class); + } catch (InvocationTargetException e) { + // function was properly called, but threw it's own exception. + // Unwrap it + // and pass it on. + throw new RuntimeException(e.getCause()); + } catch (Exception e) { + // toss all other exceptions, related to reflection failure + throw new RuntimeException("getJar invocation failed.", e); + } + + return ret; + } + /** + * Find a jar that contains a class of the same name, if any. It will return a jar file, even if + * that is not the first thing on the class path that has a class with the same name. Looks first + * on the classpath and then in the packagedClasses map. + * + * @param my_class + * the class to find. + * @return a jar file that contains the class, or null. + * @throws IOException + */ + private static String findContainingJar(Class my_class, Map packagedClasses) + throws IOException { + ClassLoader loader = my_class.getClassLoader(); + String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; + + // first search the classpath + for (Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { + URL url = itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + + // now look in any jars we've packaged using JarFinder. Returns null + // when + // no jar is found. + return packagedClasses.get(class_file); + } }