Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1523187) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -194,6 +194,8 @@ public static String HADOOP_LOCAL_FS = "file:///"; public static String MAP_PLAN_NAME = "map.xml"; public static String REDUCE_PLAN_NAME = "reduce.xml"; + public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; + public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; /** * ReduceField: @@ -267,9 +269,18 @@ return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME); } + /** + * Returns the Map or Reduce plan + * Side effect: the BaseWork returned is also placed in the gWorkMap + * @param conf + * @param name + * @return BaseWork based on the name supplied will return null if name is null + * @throws RuntimeException if the configuration files are not proper or if plan can not be loaded + */ private static BaseWork getBaseWork(Configuration conf, String name) { BaseWork gWork = null; Path path = null; + InputStream in = null; try { path = getPlanPath(conf, name); assert path != null; @@ -281,24 +292,26 @@ } else { localPath = new Path(name); } - InputStream in = new FileInputStream(localPath.toUri().getPath()); + in = new FileInputStream(localPath.toUri().getPath()); if(MAP_PLAN_NAME.equals(name)){ - if (ExecMapper.class.getName().equals(conf.get("mapred.mapper.class"))){ + if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ gWork = deserializePlan(in, MapWork.class, conf); - } else if(RCFileMergeMapper.class.getName().equals(conf.get("mapred.mapper.class"))) { + } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { gWork = deserializePlan(in, MergeWork.class, conf); - } else if(ColumnTruncateMapper.class.getName().equals(conf.get("mapred.mapper.class"))) { + } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { gWork = deserializePlan(in, ColumnTruncateWork.class, conf); - } else if(PartialScanMapper.class.getName().equals(conf.get("mapred.mapper.class"))) { + } else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { gWork = deserializePlan(in, PartialScanWork.class,conf); } else { - assert false; + throw new RuntimeException("unable to determine work from configuration ." + + MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)) ; } - } else { - if(ExecReducer.class.getName().equals(conf.get("mapred.reducer.class"))) { + } else if (REDUCE_PLAN_NAME.equals(name)) { + if(ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) { gWork = deserializePlan(in, ReduceWork.class, conf); } else { - assert false; + throw new RuntimeException("unable to determine work from configuration ." + + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } } gWorkMap.put(path, gWork); @@ -309,9 +322,14 @@ LOG.debug("No plan file found: "+path); return null; } catch (Exception e) { - e.printStackTrace(); LOG.error("Failed to load plan: "+path, e); throw new RuntimeException(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException cantBlameMeForTrying) { } + } } }