diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index cff0056..d4f6865 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; +import org.apache.hadoop.hive.druid.io.DruidRecordWriter; import org.apache.hadoop.hive.druid.serde.DruidSerDe; import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; @@ -520,7 +521,11 @@ public void configureTableJobProperties(TableDesc tableDesc, Map @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { - + try { + DruidStorageHandlerUtils.addDependencyJars(jobConf, DruidRecordWriter.class); + } catch (IOException e) { + Throwables.propagate(e); + } } @Override diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 52e7e8d..8d48e14 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Interner; @@ -28,13 +27,10 @@ import com.google.common.collect.Lists; import com.google.common.io.CharStreams; import com.metamx.common.MapUtils; -import com.metamx.common.lifecycle.Lifecycle; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.NoopEmitter; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.jackson.DefaultObjectMapper; @@ -51,14 +47,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.StringUtils; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; -import org.joda.time.Period; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -67,31 +62,46 @@ import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.Reader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetAddress; -import java.net.URI; import java.net.URL; +import java.net.URLDecoder; import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +import static org.apache.hadoop.hive.ql.exec.Utilities.jarFinderGetJar; /** * Utils class for Druid storage handler. */ public final class DruidStorageHandlerUtils { + private static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandlerUtils.class); + private static final String SMILE_CONTENT_TYPE = "application/x-jackson-smile"; + /** * Mapper to use to serialize/deserialize Druid objects (JSON) */ @@ -190,18 +200,17 @@ public static InputStream submitRequest(HttpClient client, Request request) return response; } - public static String getURL(HttpClient client, URL url) throws IOException { try (Reader reader = new InputStreamReader( DruidStorageHandlerUtils.submitRequest(client, new Request(HttpMethod.GET, url)))) { - return CharStreams.toString(reader); + return CharStreams.toString(reader); } } /** * @param taskDir path to the directory containing the segments descriptor info - * the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json - * @param conf hadoop conf to get the file system + * the descriptor path will be .../workingPath/task_id/{@link DruidStorageHandler#SEGMENTS_DESCRIPTOR_DIR_NAME}/*.json + * @param conf hadoop conf to get the file system * * @return List of DataSegments * @@ -290,7 +299,8 @@ public long push() throws IOException { public ArrayList fold(ArrayList druidDataSources, Map stringObjectMap, FoldController foldController, - StatementContext statementContext) throws SQLException { + StatementContext statementContext + ) throws SQLException { druidDataSources.add( MapUtils.getString(stringObjectMap, "datasource") ); @@ -431,4 +441,30 @@ public static Path makeSegmentDescriptorOutputPath(DataSegment pushedSegment, public interface DataPusher { long push() throws IOException; } + + // Thanks, HBase Storage handler + public static void addDependencyJars(Configuration conf, Class... classes) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Set jars = new HashSet(); + jars.addAll(conf.getStringCollection("tmpjars")); + for (Class clazz : classes) { + if (clazz == null) { + continue; + } + String path = Utilities.jarFinderGetJar(clazz); + if (path == null) { + throw new RuntimeException( + "Could not find jar for class " + clazz + " in order to ship it to the cluster."); + } + if (!localFs.exists(new Path(path))) { + throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz); + } + jars.add(path.toString()); + } + if (jars.isEmpty()) { + return; + } + conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); + } + } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index c32104f..70f3a6b 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -739,7 +739,10 @@ public void createTable(Table tbl, EnvironmentContext envContext) throws Already hook.commitCreateTable(tbl); } success = true; - } finally { + } catch (Exception e){ + LOG.error("Got exception from createTable", e); + } + finally { if (!success && (hook != null)) { hook.rollbackCreateTable(tbl); }