diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 50805a1..4a6d2f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -296,6 +296,10 @@ public static MapWork getMapWork(Configuration conf) { return (MapWork) getBaseWork(conf, MAP_PLAN_NAME); } + public static MapWork getMapWorkWithoutCache(Configuration conf) { + return (MapWork) getBaseWorkWithoutCache(conf, MAP_PLAN_NAME); + } + public static void setReduceWork(Configuration conf, ReduceWork work) { setBaseWork(conf, REDUCE_PLAN_NAME, work); } @@ -356,6 +360,23 @@ public static void setBaseWork(Configuration conf, String name, BaseWork work) { gWorkMap.get().put(path, work); } + private static BaseWork getBaseWork(Configuration conf, String name) { + Path path = getPlanPath(conf, name); + LOG.info("PLAN PATH = " + path); + assert path != null; + BaseWork gWork = gWorkMap.get().get(path); + if (gWork == null) { + gWork = getBaseWorkWithoutCache(conf, name); + if (gWork != null) { + gWorkMap.get().put(path, gWork); + return gWork; + } + } else { + return gWork; + } + return null; + } + /** * Returns the Map or Reduce plan * Side effect: the BaseWork returned is also placed in the gWorkMap @@ -364,7 +385,7 @@ public static void setBaseWork(Configuration conf, String name, BaseWork work) { * @return BaseWork based on the name supplied will return null if name is null * @throws RuntimeException if the configuration files are not proper or if plan can not be loaded */ - private static BaseWork getBaseWork(Configuration conf, String name) { + private static BaseWork getBaseWorkWithoutCache(Configuration conf, String name) { Path path = null; InputStream in = null; try { @@ -383,66 +404,60 @@ private static BaseWork getBaseWork(Configuration conf, String name) { path = getPlanPath(conf, name); LOG.info("PLAN PATH = " + path); assert path != null; - BaseWork gWork = gWorkMap.get().get(path); - if (gWork == null) { - Path localPath; - if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) { - localPath = new Path(name); - } else if (ShimLoader.getHadoopShims().isLocalMode(conf)) { - localPath = path; - } else { - LOG.info("***************non-local mode***************"); - localPath = new Path(name); - } + Path localPath; + if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) { + localPath = new Path(name); + } else if (ShimLoader.getHadoopShims().isLocalMode(conf)) { localPath = path; - LOG.info("local path = " + localPath); - if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { - LOG.debug("Loading plan from string: "+path.toUri().getPath()); - String planString = conf.get(path.toUri().getPath()); - if (planString == null) { - LOG.info("Could not find plan string in conf"); - return null; - } - byte[] planBytes = Base64.decodeBase64(planString); - in = new ByteArrayInputStream(planBytes); - in = new InflaterInputStream(in); - } else { - LOG.info("Open file to read in plan: " + localPath); - in = localPath.getFileSystem(conf).open(localPath); + } else { + LOG.info("***************non-local mode***************"); + localPath = new Path(name); + } + localPath = path; + LOG.info("local path = " + localPath); + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { + LOG.debug("Loading plan from string: " + path.toUri().getPath()); + String planString = conf.get(path.toUri().getPath()); + if (planString == null) { + LOG.info("Could not find plan string in conf"); + return null; } - - if(MAP_PLAN_NAME.equals(name)){ - if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ - gWork = deserializePlan(in, MapWork.class, conf); - } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = deserializePlan(in, MergeFileWork.class, conf); - } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = deserializePlan(in, ColumnTruncateWork.class, conf); - } else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = deserializePlan(in, PartialScanWork.class,conf); - } else { - throw new RuntimeException("unable to determine work from configuration ." - + MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)) ; - } - } else if (REDUCE_PLAN_NAME.equals(name)) { - if(ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) { - gWork = deserializePlan(in, ReduceWork.class, conf); - } else { - throw new RuntimeException("unable to determine work from configuration ." - + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; - } - } else if (name.contains(MERGE_PLAN_NAME)) { + byte[] planBytes = Base64.decodeBase64(planString); + in = new ByteArrayInputStream(planBytes); + in = new InflaterInputStream(in); + } else { + LOG.info("Open file to read in plan: " + localPath); + in = localPath.getFileSystem(conf).open(localPath); + } + BaseWork gWork = null; + if (MAP_PLAN_NAME.equals(name)) { + if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { gWork = deserializePlan(in, MapWork.class, conf); + } else if (MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { + gWork = deserializePlan(in, MergeFileWork.class, conf); + } else if (ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { + gWork = deserializePlan(in, ColumnTruncateWork.class, conf); + } else if (PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { + gWork = deserializePlan(in, PartialScanWork.class, conf); + } else { + throw new RuntimeException("unable to determine work from configuration ." + + MAPRED_MAPPER_CLASS + " was " + conf.get(MAPRED_MAPPER_CLASS)); } - gWorkMap.get().put(path, gWork); - } else if (LOG.isDebugEnabled()) { - LOG.debug("Found plan in cache for name: " + name); + } else if (REDUCE_PLAN_NAME.equals(name)) { + if (ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) { + gWork = deserializePlan(in, ReduceWork.class, conf); + } else { + throw new RuntimeException("unable to determine work from configuration ." + + MAPRED_REDUCER_CLASS + " was " + conf.get(MAPRED_REDUCER_CLASS)); + } + } else if (name.contains(MERGE_PLAN_NAME)) { + gWork = deserializePlan(in, MapWork.class, conf); } return gWork; } catch (FileNotFoundException fnf) { // happens. e.g.: no reduce work. LOG.info("File not found: " + fnf.getMessage()); - LOG.info("No plan file found: "+path); + LOG.info("No plan file found: " + path); return null; } catch (Exception e) { String msg = "Failed to load plan: " + path + ": " + e; @@ -452,7 +467,8 @@ private static BaseWork getBaseWork(Configuration conf, String name) { if (in != null) { try { in.close(); - } catch (IOException cantBlameMeForTrying) { } + } catch (IOException cantBlameMeForTrying) { + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 716a6b6..212ee59 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -96,14 +96,18 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { workToTranMap.clear(); workToParentWorkTranMap.clear(); - for (BaseWork work : sparkWork.getAllWork()) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); - SparkTran tran = generate(work); - SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); - sparkPlan.addTran(tran); - sparkPlan.connect(parentTran, tran); - workToTranMap.put(work, tran); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + try { + for (BaseWork work : sparkWork.getAllWork()) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + SparkTran tran = generate(work); + SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); + sparkPlan.addTran(tran); + sparkPlan.connect(parentTran, tran); + workToTranMap.put(work, tran); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + } + } finally { + Utilities.clearWorkMap(); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 6710c14..ae44210 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -567,7 +567,7 @@ public int hashCode() { FileInputFormat.setInputPaths(job, combinablePaths.toArray (new Path[combinablePaths.size()])); Map pathToPartitionInfo = this.pathToPartitionInfo != null ? - this.pathToPartitionInfo : Utilities.getMapWork(job).getPathToPartitionInfo(); + this.pathToPartitionInfo : Utilities.getMapWorkWithoutCache(job).getPathToPartitionInfo(); InputSplit[] splits = getCombineSplits(job, numSplits, pathToPartitionInfo); for (InputSplit split : splits) { result.add(split); @@ -580,6 +580,7 @@ public int hashCode() { if (oldPaths != null) { job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths); } + LOG.info("Number of all splits " + result.size()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new InputSplit[result.size()]); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 9d5730d..a755488 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -262,10 +262,10 @@ protected void init(JobConf job) { if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { mrwork = (MapWork) Utilities.getMergeWork(job); if (mrwork == null) { - mrwork = Utilities.getMapWork(job); + mrwork = Utilities.getMapWorkWithoutCache(job); } } else { - mrwork = Utilities.getMapWork(job); + mrwork = Utilities.getMapWorkWithoutCache(job); } pathToPartitionInfo = mrwork.getPathToPartitionInfo(); }