diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 1ddc027..6e9e9e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -400,7 +400,7 @@ private void getNextPath() throws Exception { this.inputSplits = inputSplits; splitNum = 0; - serde = partDesc.getDeserializer(); + serde = partDesc.getDeserializer(job); serde.initialize(job, partDesc.getOverlayedProperties()); if (currTbl != null) { @@ -636,7 +636,7 @@ public ObjectInspector getOutputObjectInspector() throws HiveException { // Get the OI corresponding to all the partitions for (PartitionDesc listPart : listParts) { partition = listPart; - Deserializer partSerde = listPart.getDeserializer(); + Deserializer partSerde = listPart.getDeserializer(job); partSerde.initialize(job, listPart.getOverlayedProperties()); partitionedTableOI = ObjectInspectorConverters.getConvertedOI( diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 82e40c8..aeca531 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -32,7 +32,6 @@ import java.io.DataInput; import java.io.EOFException; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -85,9 +84,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -99,7 +96,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; @@ -175,6 +171,9 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; +import org.apache.commons.codec.binary.Base64; +//import com.sun.org.apache.xml.internal.security.utils.Base64; + /** * Utilities. * @@ -288,7 +287,14 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } else { localPath = new Path(name); } - in = new FileInputStream(localPath.toUri().getPath()); + LOG.info("Loading plan from: "+path.toUri().getPath()); + String planString = conf.get(path.toUri().getPath()); + if (planString == null) { + LOG.info("Could not find plan!"); + return null; + } + byte[] planBytes = Base64.decodeBase64(planString); + in = new ByteArrayInputStream(planBytes); if(MAP_PLAN_NAME.equals(name)){ if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ gWork = deserializePlan(in, MapWork.class, conf); @@ -313,10 +319,6 @@ private static BaseWork getBaseWork(Configuration conf, String name) { gWorkMap.put(path, gWork); } return gWork; - } catch (FileNotFoundException fnf) { - // happens. e.g.: no reduce work. - LOG.debug("No plan file found: "+path); - return null; } catch (Exception e) { LOG.error("Failed to load plan: "+path, e); throw new RuntimeException(e); @@ -533,27 +535,11 @@ private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScrat Path planPath = getPlanPath(conf, name); - // use the default file system of the conf - FileSystem fs = planPath.getFileSystem(conf); - FSDataOutputStream out = fs.create(planPath); + // add it to the conf + ByteArrayOutputStream out = new ByteArrayOutputStream(); serializePlan(w, out, conf); - - // Serialize the plan to the default hdfs instance - // Except for hadoop local mode execution where we should be - // able to get the plan directly from the cache - if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) { - // Set up distributed cache - if (!DistributedCache.getSymlink(conf)) { - DistributedCache.createSymlink(conf); - } - String uriWithLink = planPath.toUri().toString() + "#" + name; - DistributedCache.addCacheFile(new URI(uriWithLink), conf); - - // set replication of the plan file to a high number. we use the same - // replication factor as used by the hadoop jobclient for job.xml etc. - short replication = (short) conf.getInt("mapred.submit.replication", 10); - fs.setReplication(planPath, replication); - } + LOG.info("Setting plan: "+planPath.toUri().getPath()); + conf.set(planPath.toUri().getPath(), Base64.encodeBase64String(out.toByteArray())); // Cache the plan in this process gWorkMap.put(planPath, w); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 4a13650..b40129c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -69,6 +69,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; import org.apache.tez.mapreduce.common.MRInputSplitDistributor; +import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -197,21 +198,15 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, Path tezDir = getTezDir(mrScratchDir); - // write out the operator plan + // set up the operator plan Path planPath = Utilities.setMapWork(conf, mapWork, mrScratchDir.toUri().toString(), false); - LocalResource planLr = createLocalResource(fs, - planPath, LocalResourceType.FILE, - LocalResourceVisibility.APPLICATION); // setup input paths and split info List inputPaths = Utilities.getInputPaths(conf, mapWork, mrScratchDir.toUri().toString(), ctx); Utilities.setInputPaths(conf, inputPaths); - InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, - new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_"))); - // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, mapWork); @@ -221,37 +216,30 @@ private static Vertex createVertex(JobConf conf, MapWork mapWork, // finally create the vertex Vertex map = null; byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf); - if (inputSplitInfo.getNumTasks() != 0) { - map = new Vertex(mapWork.getName(), - new ProcessorDescriptor(MapTezProcessor.class.getName()). - setUserPayload(serializedConf), - inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf)); - Map environment = new HashMap(); - MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); - map.setTaskEnvironment(environment); - map.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); - - assert mapWork.getAliasToWork().keySet().size() == 1; - - String alias = mapWork.getAliasToWork().keySet().iterator().next(); - byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null); - map.addInput(alias, - new InputDescriptor(MRInput.class.getName()). - setUserPayload(mrInput), null); - - map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints()); - - Map localResources = new HashMap(); - localResources.put(getBaseName(appJarLr), appJarLr); - for (LocalResource lr: additionalLr) { - localResources.put(getBaseName(lr), lr); - } - localResources.put(FilenameUtils.getName(planPath.getName()), planLr); + map = new Vertex(mapWork.getName(), + new ProcessorDescriptor(MapTezProcessor.class.getName()). + setUserPayload(serializedConf), -1, + MRHelpers.getMapResource(conf)); + Map environment = new HashMap(); + MRHelpers.updateEnvironmentForMRTasks(conf, environment, true); + map.setTaskEnvironment(environment); + map.setJavaOpts(MRHelpers.getMapJavaOpts(conf)); + + assert mapWork.getAliasToWork().keySet().size() == 1; + + String alias = mapWork.getAliasToWork().keySet().iterator().next(); + byte[] mrInput = MRHelpers.createMRInputPayload(serializedConf, null); + map.addInput(alias, + new InputDescriptor(MRInput.class.getName()). + setUserPayload(mrInput), MRInputAMSplitGenerator.class); - MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo, - localResources); - map.setTaskLocalResources(localResources); + Map localResources = new HashMap(); + localResources.put(getBaseName(appJarLr), appJarLr); + for (LocalResource lr: additionalLr) { + localResources.put(getBaseName(lr), lr); } + + map.setTaskLocalResources(localResources); return map; } @@ -278,11 +266,9 @@ private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, LocalResource appJarLr, List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) throws Exception { - // write out the operator plan + // set up operator plan Path planPath = Utilities.setReduceWork(conf, reduceWork, mrScratchDir.toUri().toString(), false); - LocalResource planLr = createLocalResource(fs, planPath, - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, reduceWork); @@ -308,7 +294,6 @@ private static Vertex createVertex(JobConf conf, ReduceWork reduceWork, for (LocalResource lr: additionalLr) { localResources.put(getBaseName(lr), lr); } - localResources.put(FilenameUtils.getName(planPath.getName()), planLr); reducer.setTaskLocalResources(localResources); return reducer; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 1e27ef1..79ca10f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -68,6 +68,11 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in //Update JobConf using MRInput, info like filename comes via this MRInput mrInput = getMRInput(inputs); + try { + mrInput.tryInitializeFromEvent(); + } catch (IOException e) { + throw new RuntimeException("Failed while initializing MRInput", e); + } Configuration updatedConf = mrInput.getConfigUpdates(); if (updatedConf != null) { for (Entry entry : updatedConf) { @@ -84,6 +89,10 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in if (mapWork == null) { mapWork = Utilities.getMapWork(jconf); cache.cache(MAP_PLAN_KEY, mapWork); + l4j.info("Plan: "+mapWork); + for (String s: mapWork.getAliases()) { + l4j.info("Alias: "+s); + } } mapOp = new MapOperator(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 6ca4d5b..26a0b61 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -307,8 +307,8 @@ public int hashCode() { Class inputFormatClass = part.getInputFileFormatClass(); String inputFormatClassName = inputFormatClass.getName(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - String deserializerClassName = part.getDeserializer() == null ? null - : part.getDeserializer().getClass().getName(); + String deserializerClassName = part.getDeserializer(job) == null ? null + : part.getDeserializer(job).getClass().getName(); // Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not, // we use a configuration variable for the same diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index 8a7c3c4..3e89054 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -23,18 +23,18 @@ import java.util.LinkedHashMap; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.util.ReflectionUtils; /** * PartitionDesc. @@ -107,9 +107,12 @@ public void setPartSpec(final LinkedHashMap partSpec) { /** * Return a deserializer object corresponding to the tableDesc. */ - public Deserializer getDeserializer() { + public Deserializer getDeserializer(Configuration conf) { try { - return MetaStoreUtils.getDeserializer(Hive.get().getConf(), getProperties()); + Deserializer deserializer = ReflectionUtils.newInstance(conf.getClassByName( + getProperties().getProperty(serdeConstants.SERIALIZATION_LIB)).asSubclass(Deserializer.class), conf); + deserializer.initialize(conf, getProperties()); + return deserializer; } catch (Exception e) { return null; }