diff --git cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java index 4fcca8c..4a7b0d6 100644 --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java @@ -27,12 +27,12 @@ import java.io.IOException; import java.io.PrintStream; import java.io.UnsupportedEncodingException; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; -import java.sql.SQLException; import jline.ArgumentCompletor; import jline.ArgumentCompletor.AbstractArgumentDelimiter; @@ -271,10 +271,15 @@ int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) { return ret; } + // query has run capture the time + long end = System.currentTimeMillis(); + double timeTaken = (end - start) / 1000.0; + ArrayList res = new ArrayList(); printHeader(qp, out); + // print the results int counter = 0; try { while (qp.getResults(res)) { @@ -299,11 +304,8 @@ int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) { ret = cret; } - long end = System.currentTimeMillis(); - double timeTaken = (end - start) / 1000.0; console.printInfo("Time taken: " + timeTaken + " seconds" + (counter == 0 ? "" : ", Fetched: " + counter + " row(s)")); - } else { String firstToken = tokenizeCmd(cmd.trim())[0]; String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length()); diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7852730..be5eac7 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -820,10 +820,16 @@ HIVE_JAR_DIRECTORY("hive.jar.directory", "hdfs:///user/hive/"), HIVE_USER_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/"), - //Vectorization enabled + // Vectorization enabled HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false), HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true), + + // Whether to send the query plan via local resource or RPC + HIVE_RPC_QUERY_PLAN("hive.rpc.query.plan", false), + + // Whether to generate the splits locally or in the AM (tez only) + HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true); ; public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 1ddc027..6e9e9e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -400,7 +400,7 @@ private void getNextPath() throws Exception { this.inputSplits = inputSplits; splitNum = 0; - serde = partDesc.getDeserializer(); + serde = partDesc.getDeserializer(job); serde.initialize(job, partDesc.getOverlayedProperties()); if (currTbl != null) { @@ -636,7 +636,7 @@ public ObjectInspector getOutputObjectInspector() throws HiveException { // Get the OI corresponding to all the partitions for (PartitionDesc listPart : listParts) { partition = listPart; - Deserializer partSerde = listPart.getDeserializer(); + Deserializer partSerde = listPart.getDeserializer(job); partSerde.initialize(job, listPart.getOverlayedProperties()); partitionedTableOI = ObjectInspectorConverters.getConvertedOI( diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 82e40c8..dfcd2bc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -80,6 +80,7 @@ import java.util.regex.Pattern; import org.antlr.runtime.CommonToken; +import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; import org.apache.commons.logging.Log; @@ -87,7 +88,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -95,11 +95,11 @@ import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; @@ -175,6 +175,8 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; +import org.apache.commons.codec.binary.Base64; + /** * Utilities. * @@ -288,7 +290,20 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } else { localPath = new Path(name); } - in = new FileInputStream(localPath.toUri().getPath()); + + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { + LOG.info("Loading plan from: "+path.toUri().getPath()); + String planString = conf.get(path.toUri().getPath()); + if (planString == null) { + LOG.info("Could not find plan!"); + return null; + } + byte[] planBytes = Base64.decodeBase64(planString); + in = new ByteArrayInputStream(planBytes); + } else { + in = new FileInputStream(localPath.toUri().getPath()); + } + if(MAP_PLAN_NAME.equals(name)){ if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ gWork = deserializePlan(in, MapWork.class, conf); @@ -533,26 +548,37 @@ private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScrat Path planPath = getPlanPath(conf, name); - // use the default file system of the conf - FileSystem fs = planPath.getFileSystem(conf); - FSDataOutputStream out = fs.create(planPath); - serializePlan(w, out, conf); - - // Serialize the plan to the default hdfs instance - // Except for hadoop local mode execution where we should be - // able to get the plan directly from the cache - if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) { - // Set up distributed cache - if (!DistributedCache.getSymlink(conf)) { - DistributedCache.createSymlink(conf); - } - String uriWithLink = planPath.toUri().toString() + "#" + name; - DistributedCache.addCacheFile(new URI(uriWithLink), conf); + OutputStream out; + + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { + // add it to the conf + out = new ByteArrayOutputStream(); + serializePlan(w, out, conf); + LOG.info("Setting plan: "+planPath.toUri().getPath()); + conf.set(planPath.toUri().getPath(), + Base64.encodeBase64String(((ByteArrayOutputStream)out).toByteArray())); + } else { + // use the default file system of the conf + FileSystem fs = planPath.getFileSystem(conf); + out = fs.create(planPath); + serializePlan(w, out, conf); + + // Serialize the plan to the default hdfs instance + // Except for hadoop local mode execution where we should be + // able to get the plan directly from the cache + if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) { + // Set up distributed cache + if (!DistributedCache.getSymlink(conf)) { + DistributedCache.createSymlink(conf); + } + String uriWithLink = planPath.toUri().toString() + "#" + name; + DistributedCache.addCacheFile(new URI(uriWithLink), conf); - // set replication of the plan file to a high number. we use the same - // replication factor as used by the hadoop jobclient for job.xml etc. - short replication = (short) conf.getInt("mapred.submit.replication", 10); - fs.setReplication(planPath, replication); + // set replication of the plan file to a high number. we use the same + // replication factor as used by the hadoop jobclient for job.xml etc. + short replication = (short) conf.getInt("mapred.submit.replication", 10); + fs.setReplication(planPath, replication); + } } // Cache the plan in this process diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 4a13650..1726da6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -69,11 +70,12 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; import org.apache.tez.mapreduce.common.MRInputSplitDistributor; +import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator; -import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy; @@ -197,21 +199,15 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, Path tezDir = getTezDir(mrScratchDir); - // write out the operator plan + // set up the operator plan Path planPath = Utilities.setMapWork(conf, mapWork, mrScratchDir.toUri().toString(), false); - LocalResource planLr = createLocalResource(fs, - planPath, LocalResourceType.FILE, - LocalResourceVisibility.APPLICATION); // setup input paths and split info List inputPaths = Utilities.getInputPaths(conf, mapWork, mrScratchDir.toUri().toString(), ctx); Utilities.setInputPaths(conf, inputPaths); - InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, - new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_"))); - // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, mapWork); @@ -220,38 +216,54 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, // finally create the vertex Vertex map = null; + + int numTasks = -1; + Class amSplitGeneratorClass = null; + InputSplitInfo inputSplitInfo = null; + + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) { + // if we're generating the splits in the AM, we just need to set + // the correct plugin. + amSplitGeneratorClass = MRInputAMSplitGenerator.class; + } else { + // client side split generation means we have to compute them now + inputSplitInfo = MRHelpers.generateInputSplits(conf, + new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_"))); + numTasks = inputSplitInfo.getNumTasks(); + } + byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); - if (inputSplitInfo.getNumTasks() != 0) { - map = new Vertex(mapWork.getName(), - new ProcessorDescriptor(MapTezProcessor.class.getName()). - setUserPayload(serializedConf), - inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf)); - Map environment = new HashMap(); - MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); - map.setTaskEnvironment(environment); - map.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); - - assert mapWork.getAliasToWork().keySet().size() == 1; - - String alias = mapWork.getAliasToWork().keySet().iterator().next(); - byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null); - map.addInput(alias, - new InputDescriptor(MRInput.class.getName()). - setUserPayload(mrInput), null); + map = new Vertex(mapWork.getName(), + new ProcessorDescriptor(MapTezProcessor.class.getName()). + setUserPayload(serializedConf), numTasks, + MRHelpers.getMapResource(conf)); + Map environment = new HashMap(); + MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); + map.setTaskEnvironment(environment); + map.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); - map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints()); + assert mapWork.getAliasToWork().keySet().size() == 1; - Map localResources = new HashMap(); - localResources.put(getBaseName(appJarLr), appJarLr); - for (LocalResource lr: additionalLr) { - localResources.put(getBaseName(lr), lr); - } - localResources.put(FilenameUtils.getName(planPath.getName()), planLr); + String alias = mapWork.getAliasToWork().keySet().iterator().next(); + byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null); + map.addInput(alias, + new InputDescriptor(MRInputLegacy.class.getName()). + setUserPayload(mrInput), amSplitGeneratorClass); + Map localResources = new HashMap(); + localResources.put(getBaseName(appJarLr), appJarLr); + for (LocalResource lr: additionalLr) { + localResources.put(getBaseName(lr), lr); + } + + if (inputSplitInfo != null) { + // only relevant for client-side split generation + map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints()); MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo, localResources); - map.setTaskLocalResources(localResources); } + + map.setTaskLocalResources(localResources); return map; } @@ -278,11 +290,9 @@ private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, LocalResource appJarLr, List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) throws Exception { - // write out the operator plan + // set up operator plan Path planPath = Utilities.setReduceWork(conf, reduceWork, mrScratchDir.toUri().toString(), false); - LocalResource planLr = createLocalResource(fs, planPath, - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, reduceWork); @@ -308,7 +318,6 @@ private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, for (LocalResource lr: additionalLr) { localResources.put(getBaseName(lr), lr); } - localResources.put(FilenameUtils.getName(planPath.getName()), planLr); reducer.setTaskLocalResources(localResources); return reducer; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 1e27ef1..ca6f4e4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -41,7 +41,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.StringUtils; -import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.library.api.KeyValueReader; @@ -67,7 +67,12 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in super.init(jconf, mrReporter, inputs, out); //Update JobConf using MRInput, info like filename comes via this - MRInput mrInput = getMRInput(inputs); + MRInputLegacy mrInput = getMRInput(inputs); + try { + mrInput.init(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } Configuration updatedConf = mrInput.getConfigUpdates(); if (updatedConf != null) { for (Entry entry : updatedConf) { @@ -84,6 +89,10 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in if (mapWork == null) { mapWork = Utilities.getMapWork(jconf); cache.cache(MAP_PLAN_KEY, mapWork); + l4j.info("Plan: "+mapWork); + for (String s: mapWork.getAliases()) { + l4j.info("Alias: "+s); + } } mapOp = new MapOperator(); @@ -126,16 +135,16 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); } - private MRInput getMRInput(Map inputs) { + private MRInputLegacy getMRInput(Map inputs) { //there should be only one MRInput - MRInput theMRInput = null; + MRInputLegacy theMRInput = null; for(LogicalInput inp : inputs.values()){ - if(inp instanceof MRInput){ + if(inp instanceof MRInputLegacy){ if(theMRInput != null){ throw new IllegalArgumentException("Only one MRInput is expected"); } //a better logic would be to find the alias - theMRInput = (MRInput)inp; + theMRInput = (MRInputLegacy)inp; } } return theMRInput; @@ -144,7 +153,7 @@ private MRInput getMRInput(Map inputs) { @Override void run() throws IOException{ - MRInput in = getMRInput(inputs); + MRInputLegacy in = getMRInput(inputs); KeyValueReader reader = in.getReader(); //process records until done diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 9c727de..b8875c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -67,7 +67,7 @@ public int monitorExecution(DAGClient dagClient) throws InterruptedException { boolean running = false; boolean done = false; - int checkInterval = 500; + int checkInterval = 200; int printInterval = 3000; int maxRetryInterval = 2500; int counter = 0; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 4eda92c..6ba2e2d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack; @@ -78,6 +79,9 @@ public int execute(DriverContext driverContext) { DAGClient client = null; TezSessionState session = null; + // Tez requires us to use RPC for the query plan + HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true); + try { // Get or create Context object. If we create it we have to clean // it later as well. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 6ca4d5b..26a0b61 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -307,8 +307,8 @@ public int hashCode() { Class inputFormatClass = part.getInputFileFormatClass(); String inputFormatClassName = inputFormatClass.getName(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - String deserializerClassName = part.getDeserializer() == null ? null - : part.getDeserializer().getClass().getName(); + String deserializerClassName = part.getDeserializer(job) == null ? null + : part.getDeserializer(job).getClass().getName(); // Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not, // we use a configuration variable for the same diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index 8a7c3c4..3e89054 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -23,18 +23,18 @@ import java.util.LinkedHashMap; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.util.ReflectionUtils; /** * PartitionDesc. @@ -107,9 +107,12 @@ public void setPartSpec(final LinkedHashMap partSpec) { /** * Return a deserializer object corresponding to the tableDesc. */ - public Deserializer getDeserializer() { + public Deserializer getDeserializer(Configuration conf) { try { - return MetaStoreUtils.getDeserializer(Hive.get().getConf(), getProperties()); + Deserializer deserializer = ReflectionUtils.newInstance(conf.getClassByName( + getProperties().getProperty(serdeConstants.SERIALIZATION_LIB)).asSubclass(Deserializer.class), conf); + deserializer.initialize(conf, getProperties()); + return deserializer; } catch (Exception e) { return null; }