diff --git metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 3bf2314..21bc9bf 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -33,8 +33,6 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import javax.jdo.JDOHelper; import javax.jdo.JDOObjectNotFoundException; @@ -43,6 +41,7 @@ import javax.jdo.Query; import javax.jdo.Transaction; import javax.jdo.datastore.DataStoreCache; +import javax.jdo.datastore.JDOConnection; import org.antlr.runtime.CharStream; import org.antlr.runtime.CommonTokenStream; @@ -128,10 +127,9 @@ * filestore. */ public class ObjectStore implements RawStore, Configurable { - private static Properties prop = null; - private static PersistenceManagerFactory pmf = null; + private Properties prop = null; + private PersistenceManagerFactory pmf = null; - private static Lock pmfPropLock = new ReentrantLock(); private static final Log LOG = LogFactory.getLog(ObjectStore.class.getName()); private static enum TXN_STATUS { @@ -140,7 +138,7 @@ private static final Map PINCLASSMAP; static { - Map map = new HashMap(); + Map map = new HashMap(); map.put("table", MTable.class); map.put("storagedescriptor", MStorageDescriptor.class); map.put("serdeinfo", MSerDeInfo.class); @@ -173,40 +171,35 @@ public Configuration getConf() { */ @SuppressWarnings("nls") public void setConf(Configuration conf) { - // Although an instance of ObjectStore is accessed by one thread, there may - // be many threads with ObjectStore instances. So the static variables - // pmf and prop need to be protected with locks. - pmfPropLock.lock(); - try { - isInitialized = false; - hiveConf = conf; - Properties propsFromConf = getDataSourceProps(conf); - boolean propsChanged = !propsFromConf.equals(prop); - - if (propsChanged) { - pmf = null; - prop = null; - } - - assert(!isActiveTransaction()); - shutdown(); - // Always want to re-create pm as we don't know if it were created by the - // most recent instance of the pmf - pm = null; - openTrasactionCalls = 0; - currentTransaction = null; - transactionStatus = TXN_STATUS.NO_STATE; - - initialize(propsFromConf); - - if (!isInitialized) { - throw new RuntimeException( - "Unable to create persistence manager. Check dss.log for details"); - } else { - LOG.info("Initialized ObjectStore"); - } - } finally { - pmfPropLock.unlock(); + isInitialized = false; + hiveConf = conf; + Properties propsFromConf = getDataSourceProps(conf); + boolean propsChanged = !propsFromConf.equals(prop); + + if (!propsChanged) { + return; + } + + prop = null; + + assert(!isActiveTransaction()); + shutdown(); + // Always want to re-create pm as we don't know if it were created by the + // most recent instance of the pmf + pm = null; + pmf = null; + + openTrasactionCalls = 0; + currentTransaction = null; + transactionStatus = TXN_STATUS.NO_STATE; + + initialize(propsFromConf); + + if (!isInitialized) { + throw new RuntimeException( + "Unable to create persistence manager. Check dss.log for details"); + } else { + LOG.info("Initialized ObjectStore"); } } @@ -258,7 +251,7 @@ private static Properties getDataSourceProps(Configuration conf) { return prop; } - private static synchronized PersistenceManagerFactory getPMF() { + private PersistenceManagerFactory getPMF() { if (pmf == null) { pmf = JDOHelper.getPersistenceManagerFactory(prop); DataStoreCache dsc = pmf.getDataStoreCache(); @@ -296,6 +289,9 @@ public void shutdown() { if (pm != null) { pm.close(); } + if (pmf != null) { + pmf.close(); + } } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index b28d16e..0a297fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -206,26 +206,18 @@ public boolean isEmptyTable() { return work.getTblDir() == null && (work.getPartDir() == null || work.getPartDir().isEmpty()); } - /** - * A cache of InputFormat instances. - */ - private static Map> inputFormats = new HashMap>(); - - @SuppressWarnings("unchecked") - static InputFormat getInputFormatFromCache(Class inputFormatClass, + private InputFormat getInputFormat(Class inputFormatClass, Configuration conf) throws IOException { - if (!inputFormats.containsKey(inputFormatClass)) { - try { - InputFormat newInstance = (InputFormat) ReflectionUtils - .newInstance(inputFormatClass, conf); - inputFormats.put(inputFormatClass, newInstance); - } catch (Exception e) { - throw new IOException("Cannot create an instance of InputFormat class " - + inputFormatClass.getName() + " as specified in mapredWork!", e); - } + try { + InputFormat newInstance = (InputFormat) ReflectionUtils + .newInstance(inputFormatClass, conf); + return newInstance; + } catch (Exception e) { + throw new IOException("Cannot create an instance of InputFormat class " + + inputFormatClass.getName() + " as specified in mapredWork!", e); } - return inputFormats.get(inputFormatClass); } + private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception { Deserializer serde = table.getDeserializerClass().newInstance(); @@ -376,7 +368,7 @@ private void getNextPath() throws Exception { } Class formatter = partDesc.getInputFileFormatClass(); - inputFormat = getInputFormatFromCache(formatter, job); + inputFormat = getInputFormat(formatter, job); Utilities.copyTableJobPropertiesToConf(partDesc.getTableDesc(), job); InputSplit[] splits = inputFormat.getSplits(job, 1); FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index b4b2c90..6320dd8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -165,9 +165,21 @@ private static Log LOG = LogFactory.getLog(FunctionRegistry.class); /** - * The mapping from expression function names to expression classes. + * Contains all the native functions */ - static Map mFunctions = Collections.synchronizedMap(new LinkedHashMap()); + static Map mFunctions = new LinkedHashMap(); + + /** + * Contains all temporary functions + */ + static ThreadLocal> mFunctionsTempTL + = new ThreadLocal>() { + @Override + protected synchronized Map initialValue() { + return new LinkedHashMap(); + } + }; + /* * PTF variables @@ -481,7 +493,12 @@ public static void registerUDF(boolean isNative, String functionName, if (UDF.class.isAssignableFrom(UDFClass)) { FunctionInfo fI = new FunctionInfo(isNative, displayName, new GenericUDFBridge(displayName, isOperator, UDFClass)); - mFunctions.put(functionName.toLowerCase(), fI); + + if (isNative) { + mFunctions.put(functionName.toLowerCase(), fI); + } else { + mFunctionsTempTL.get().put(functionName.toLowerCase(), fI); + } } else { throw new RuntimeException("Registering UDF Class " + UDFClass + " which does not extend " + UDF.class); @@ -503,7 +520,11 @@ public static void registerGenericUDF(boolean isNative, String functionName, if (GenericUDF.class.isAssignableFrom(genericUDFClass)) { FunctionInfo fI = new FunctionInfo(isNative, functionName, (GenericUDF) ReflectionUtils.newInstance(genericUDFClass, null)); - mFunctions.put(functionName.toLowerCase(), fI); + if (isNative) { + mFunctions.put(functionName.toLowerCase(), fI); + } else { + mFunctionsTempTL.get().put(functionName.toLowerCase(), fI); + } } else { throw new RuntimeException("Registering GenericUDF Class " + genericUDFClass + " which does not extend " + GenericUDF.class); @@ -525,7 +546,11 @@ public static void registerGenericUDTF(boolean isNative, String functionName, if (GenericUDTF.class.isAssignableFrom(genericUDTFClass)) { FunctionInfo fI = new FunctionInfo(isNative, functionName, (GenericUDTF) ReflectionUtils.newInstance(genericUDTFClass, null)); - mFunctions.put(functionName.toLowerCase(), fI); + if (isNative) { + mFunctions.put(functionName.toLowerCase(), fI); + } else { + mFunctionsTempTL.get().put(functionName.toLowerCase(), fI); + } } else { throw new RuntimeException("Registering GenericUDTF Class " + genericUDTFClass + " which does not extend " + GenericUDTF.class); @@ -533,7 +558,12 @@ public static void registerGenericUDTF(boolean isNative, String functionName, } public static FunctionInfo getFunctionInfo(String functionName) { - return mFunctions.get(functionName.toLowerCase()); + // try temporary ones first + FunctionInfo fi = mFunctionsTempTL.get().get(functionName.toLowerCase()); + if (fi == null) { + fi = mFunctions.get(functionName.toLowerCase()); + } + return fi; } /** @@ -543,7 +573,10 @@ public static FunctionInfo getFunctionInfo(String functionName) { * @return set of strings contains function names */ public static Set getFunctionNames() { - return mFunctions.keySet(); + HashSet allFuncNames = new HashSet(); + allFuncNames.addAll(mFunctions.keySet()); + allFuncNames.addAll(mFunctionsTempTL.get().keySet()); + return allFuncNames; } /** @@ -563,7 +596,7 @@ public static FunctionInfo getFunctionInfo(String functionName) { } catch (PatternSyntaxException e) { return funcNames; } - for (String funcName : mFunctions.keySet()) { + for (String funcName : getFunctionNames()) { if (funcPattern.matcher(funcName).matches()) { funcNames.add(funcName); } @@ -587,11 +620,11 @@ public static FunctionInfo getFunctionInfo(String functionName) { } Class funcClass = funcInfo.getFunctionClass(); - for (String name : mFunctions.keySet()) { + for (String name : getFunctionNames()) { if (name.equals(funcName)) { continue; } - if (mFunctions.get(name).getFunctionClass().equals(funcClass)) { + if (getFunctionInfo(name).getFunctionClass().equals(funcClass)) { synonyms.add(name); } } @@ -819,8 +852,13 @@ static void registerGenericUDAF(String functionName, public static void registerGenericUDAF(boolean isNative, String functionName, GenericUDAFResolver genericUDAFResolver) { - mFunctions.put(functionName.toLowerCase(), new FunctionInfo(isNative, - functionName.toLowerCase(), genericUDAFResolver)); + if (isNative) { + mFunctions.put(functionName.toLowerCase(), new FunctionInfo(isNative, + functionName.toLowerCase(), genericUDAFResolver)); + } else { + mFunctionsTempTL.get().put(functionName.toLowerCase(), new FunctionInfo(isNative, + functionName.toLowerCase(), genericUDAFResolver)); + } } public static void registerTemporaryUDAF(String functionName, @@ -834,16 +872,22 @@ static void registerUDAF(String functionName, Class udafClass) { public static void registerUDAF(boolean isNative, String functionName, Class udafClass) { - mFunctions.put(functionName.toLowerCase(), new FunctionInfo(isNative, - functionName.toLowerCase(), new GenericUDAFBridge( - (UDAF) ReflectionUtils.newInstance(udafClass, null)))); + if (isNative) { + mFunctions.put(functionName.toLowerCase(), new FunctionInfo(isNative, + functionName.toLowerCase(), new GenericUDAFBridge( + (UDAF) ReflectionUtils.newInstance(udafClass, null)))); + } else { + mFunctionsTempTL.get().put(functionName.toLowerCase(), new FunctionInfo(isNative, + functionName.toLowerCase(), new GenericUDAFBridge( + (UDAF) ReflectionUtils.newInstance(udafClass, null)))); + } } public static void unregisterTemporaryUDF(String functionName) throws HiveException { - FunctionInfo fi = mFunctions.get(functionName.toLowerCase()); + FunctionInfo fi = getFunctionInfo(functionName.toLowerCase()); if (fi != null) { if (!fi.isNative()) { - mFunctions.remove(functionName.toLowerCase()); + mFunctionsTempTL.get().remove(functionName.toLowerCase()); } else { throw new HiveException("Function " + functionName + " is hive native, it can't be dropped"); @@ -851,11 +895,20 @@ public static void unregisterTemporaryUDF(String functionName) throws HiveExcept } } + /** + * Unregister any functions that were created using given class loader + * @param cl + * @throws HiveException + */ + public static void unregisterTemporaryUDFs() { + mFunctionsTempTL.get().clear(); + } + public static GenericUDAFResolver getGenericUDAFResolver(String functionName) { if (LOG.isDebugEnabled()) { LOG.debug("Looking up GenericUDAF: " + functionName); } - FunctionInfo finfo = mFunctions.get(functionName.toLowerCase()); + FunctionInfo finfo = getFunctionInfo(functionName.toLowerCase()); if (finfo == null) { return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 95f1e2c..042c5af 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -77,8 +77,6 @@ private transient ExecMapperContext execContext; - private static int seqId; - // It can be optimized later so that an operator operator (init/close) is performed // only after that operation has been performed on all the parents. This will require // initializing the whole tree in all the mappers (which might be required for mappers @@ -100,21 +98,32 @@ protected transient State state = State.UNINIT; - static transient boolean fatalError = false; // fatalError is shared acorss - // all operators - - static { - seqId = 0; - } + static transient boolean fatalError = false; // fatalError is shared across all operators + + private static ThreadLocal tlSeqId = new ThreadLocal() { + @Override + protected Integer initialValue() { + return new Integer(0); + } + }; private boolean useBucketizedHiveInputFormat; public Operator() { - id = String.valueOf(seqId++); + setId(); } + /** + * Sets operator id from static threadlocal counter + */ + private void setId() { + id = String.valueOf(tlSeqId.get()); + // increment seq id + tlSeqId.set(new Integer(tlSeqId.get().intValue() + 1)); + } + public static void resetId() { - seqId = 0; + tlSeqId.set(new Integer(0)); } /** @@ -125,7 +134,7 @@ public static void resetId() { */ public Operator(Reporter reporter) { this.reporter = reporter; - id = String.valueOf(seqId++); + setId(); } public void setChildOperators( @@ -597,8 +606,8 @@ public void close(boolean abort) throws HiveException { for (Operator op : childOperators) { op.close(abort); } - LOG.info(id + " Close done"); + resetId(); } catch (HiveException e) { e.printStackTrace(); throw e; @@ -1111,7 +1120,7 @@ protected static StructObjectInspector initEvaluatorsAndReturnStruct( C991, C992, C993, C994, C995, C996, C997, C998, C999, C1000 }; - private static int totalNumCntrs = 1000; + private static final int totalNumCntrs = 1000; /** * populated at runtime from hadoop counters at run time in the client. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b789d78..438fffd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1829,7 +1829,7 @@ public void run() { Class inputFormatCls = partDesc .getInputFileFormatClass(); - InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( + InputFormat inputFormatObj = HiveInputFormat.getInputFormat( inputFormatCls, myJobConf); if (inputFormatObj instanceof ContentSummaryInputFormat) { resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p, diff --git ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java index 97436c5..884398b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java +++ ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java @@ -115,7 +115,11 @@ private static final Pattern rowCountPattern = Pattern.compile(ROW_COUNT_PATTERN); // temp buffer for parsed dataa - private static Map parseBuffer = new HashMap(); + private static ThreadLocal> parseBuffer = new ThreadLocal>() { + @Override protected Map initialValue() { + return new HashMap(); + } + }; /** * Listner interface Parser will call handle function for each record type. @@ -174,12 +178,12 @@ private static void parseLine(String line, Listener l) throws IOException { String tuple = matcher.group(0); String[] parts = tuple.split("="); - parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() - 1)); + parseBuffer.get().put(parts[0], parts[1].substring(1, parts[1].length() - 1)); } - l.handle(RecordTypes.valueOf(recType), parseBuffer); + l.handle(RecordTypes.valueOf(recType), parseBuffer.get()); - parseBuffer.clear(); + parseBuffer.get().clear(); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java index 427ea12..c99f05e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java @@ -45,7 +45,7 @@ */ public class AggregateIndexHandler extends CompactIndexHandler { - private static Index index = null; + private Index index = null; @Override public void analyzeIndexDefinition(Table baseTable, Index idx, diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java index c52624c..a6fd702 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexedInputFormat.java @@ -84,7 +84,7 @@ public HiveIndexedInputFormat(String indexFileName) { // create a new InputFormat instance if this is the first time to see this // class Class inputFormatClass = part.getInputFileFormatClass(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + InputFormat inputFormat = getInputFormat(inputFormatClass, job); Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob); FileInputFormat.setInputPaths(newjob, dir); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index 49145b7..788c7e9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -71,7 +71,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath() .toString(), hsplit.getPath().toUri().getPath()); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, + InputFormat inputFormat = getInputFormat(inputFormatClass, cloneJobConf); BucketizedHiveRecordReader rr= new BucketizedHiveRecordReader(inputFormat, hsplit, cloneJobConf, reporter); @@ -123,7 +123,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, // create a new InputFormat instance if this is the first time to see this // class Class inputFormatClass = part.getInputFileFormatClass(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + InputFormat inputFormat = getInputFormat(inputFormatClass, job); newjob.setInputFormat(inputFormat.getClass()); FileStatus[] listStatus = listStatus(newjob, dir); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 9ab4b24..82fbbc4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -300,7 +300,7 @@ public int hashCode() { // Use HiveInputFormat if any of the paths is not splittable Class inputFormatClass = part.getInputFileFormatClass(); String inputFormatClassName = inputFormatClass.getName(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + InputFormat inputFormat = getInputFormat(inputFormatClass, job); String deserializerClassName = part.getDeserializerClass() == null ? null : part.getDeserializerClass().getName(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java index 6318b2f..772c10a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java @@ -54,7 +54,7 @@ public CombineHiveRecordReader(InputSplit split, Configuration conf, throw new IOException("CombineHiveRecordReader: class not found " + inputFormatClassName); } - InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache( + InputFormat inputFormat = HiveInputFormat.getInputFormat( inputFormatClass, jobConf); // create a split for the given partition diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 1b99781..22744b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -26,6 +26,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.io.CharArrayWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -50,6 +51,16 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.*; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.util.Shell; /** @@ -71,8 +82,9 @@ } @SuppressWarnings("unchecked") - private static Map, Class> - outputFormatSubstituteMap; + private static Map, Class> + outputFormatSubstituteMap; + private static final Log LOG = LogFactory.getLog(HiveFileFormatUtils.class); /** * register a substitute. @@ -128,16 +140,12 @@ public static Path getOutputFormatFinalPath(Path parent, String taskId, JobConf HiveFileFormatUtils.registerInputFormatChecker( SequenceFileInputFormat.class, SequenceFileInputFormatChecker.class); HiveFileFormatUtils.registerInputFormatChecker(RCFileInputFormat.class, - RCFileInputFormat.class); - inputFormatCheckerInstanceCache = - new HashMap, InputFormatChecker>(); + RCFileInputFormat.class); } @SuppressWarnings("unchecked") private static Map, Class> inputFormatCheckerMap; - private static Map, InputFormatChecker> inputFormatCheckerInstanceCache; - /** * register an InputFormatChecker for a given InputFormat. * @@ -181,13 +189,8 @@ public static boolean checkInputFormat(FileSystem fs, HiveConf conf, } if (checkerCls != null) { - InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache - .get(checkerCls); try { - if (checkerInstance == null) { - checkerInstance = checkerCls.newInstance(); - inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance); - } + InputFormatChecker checkerInstance = checkerCls.newInstance(); return checkerInstance.validateInput(fs, conf, files); } catch (Exception e) { throw new HiveException(e); @@ -262,10 +265,12 @@ public static PartitionDesc getPartitionDescFromPathRecursively( } public static PartitionDesc getPartitionDescFromPathRecursively( - Map pathToPartitionInfo, Path dir, - Map, Map> cacheMap, - boolean ignoreSchema) throws IOException { - + Map pathToPartitionInfo, Path dir, + Map, Map> cacheMap, + boolean ignoreSchema) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("looking up " + dir + " in " + pathToPartitionInfo.keySet()); + } PartitionDesc part = doGetPartitionDescFromPath(pathToPartitionInfo, dir); if (part == null diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index adf4923..c470aee 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -21,6 +21,10 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.Serializable; +import java.io.Writer; +import java.io.StringWriter; +import java.io.PrintWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -41,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; @@ -177,28 +182,16 @@ public void configure(JobConf job) { this.job = job; } - /** - * A cache of InputFormat instances. - */ - protected static Map> inputFormats; - - public static InputFormat getInputFormatFromCache( - Class inputFormatClass, JobConf job) throws IOException { - - if (inputFormats == null) { - inputFormats = new HashMap>(); - } - if (!inputFormats.containsKey(inputFormatClass)) { - try { - InputFormat newInstance = (InputFormat) ReflectionUtils - .newInstance(inputFormatClass, job); - inputFormats.put(inputFormatClass, newInstance); - } catch (Exception e) { - throw new IOException("Cannot create an instance of InputFormat class " - + inputFormatClass.getName() + " as specified in mapredWork!", e); - } + public static InputFormat getInputFormat( + Class inputFormatClass, JobConf job) throws IOException { + try { + InputFormat newInstance = (InputFormat) ReflectionUtils + .newInstance(inputFormatClass, job); + return newInstance; + } catch (Exception e) { + throw new IOException("Cannot create an instance of InputFormat class " + + inputFormatClass.getName() + " as specified in mapredWork!", e); } - return inputFormats.get(inputFormatClass); } public RecordReader getRecordReader(InputSplit split, JobConf job, @@ -233,7 +226,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, pushProjectionsAndFilters(cloneJobConf, inputFormatClass, hsplit.getPath() .toString(), hsplit.getPath().toUri().getPath(), nonNative); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, + InputFormat inputFormat = getInputFormat(inputFormatClass, cloneJobConf); RecordReader innerReader = null; try { @@ -264,17 +257,17 @@ protected void init(JobConf job) { if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } + JobConf newjob = new JobConf(job); ArrayList result = new ArrayList(); // for each dir, get the InputFormat, and do getSplits. for (Path dir : dirs) { PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); - // create a new InputFormat instance if this is the first time to see this - // class Class inputFormatClass = part.getInputFileFormatClass(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob); + InputFormat inputFormat = getInputFormat(inputFormatClass, job); + + Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob); // Make filter pushdown information available to getSplits. ArrayList aliases = @@ -315,7 +308,7 @@ public void validateInput(JobConf job) throws IOException { PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); // create a new InputFormat instance if this is the first time to see this // class - InputFormat inputFormat = getInputFormatFromCache(part + InputFormat inputFormat = getInputFormat(part .getInputFileFormatClass(), job); FileInputFormat.setInputPaths(newjob, dir); diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 87a584d..5053c68 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -43,10 +43,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3.S3FileSystem; +import org.apache.hadoop.fs.s3native.NativeS3FileSystem; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaException; @@ -174,11 +177,16 @@ public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException { return db; } + /** + * Get the thread-local Hive. Assumption is that it has been + * set up already. + * @return + * @throws HiveException + */ public static Hive get() throws HiveException { Hive db = hiveDB.get(); if (db == null) { - db = new Hive(new HiveConf(Hive.class)); - hiveDB.set(db); + throw new HiveException("Hive.get() called without a hive db setup"); } return db; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java index 308b5aa..26a85ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java @@ -50,7 +50,6 @@ * */ public final class RewriteCanApplyProcFactory { - private static RewriteCanApplyCtx canApplyCtx = null; private RewriteCanApplyProcFactory(){ //this prevents the class from getting instantiated @@ -60,6 +59,8 @@ private RewriteCanApplyProcFactory(){ * Check for conditions in FilterOperator that do not meet rewrite criteria. */ private static class CheckFilterProc implements NodeProcessor { + private RewriteCanApplyCtx canApplyCtx = null; + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { FilterOperator operator = (FilterOperator)nd; @@ -94,6 +95,7 @@ public static CheckFilterProc canApplyOnFilterOperator() { * */ private static class CheckGroupByProc implements NodeProcessor { + private RewriteCanApplyCtx canApplyCtx = null; public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { @@ -199,6 +201,8 @@ public static CheckGroupByProc canApplyOnGroupByOperator() { * Check for conditions in SelectOperator that do not meet rewrite criteria. */ private static class CheckSelectProc implements NodeProcessor { + private RewriteCanApplyCtx canApplyCtx = null; + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { SelectOperator operator = (SelectOperator)nd; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java index 1d8336f..be8be19 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java @@ -68,13 +68,13 @@ */ public final class RewriteQueryUsingAggregateIndex { private static final Log LOG = LogFactory.getLog(RewriteQueryUsingAggregateIndex.class.getName()); - private static RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null; private RewriteQueryUsingAggregateIndex() { //this prevents the class from getting instantiated } private static class NewQuerySelectSchemaProc implements NodeProcessor { + private RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null; public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { SelectOperator operator = (SelectOperator)nd; @@ -121,6 +121,8 @@ public static NewQuerySelectSchemaProc getNewQuerySelectSchemaProc(){ * */ private static class ReplaceTableScanOpProc implements NodeProcessor { + private RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null; + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { TableScanOperator scanOperator = (TableScanOperator)nd; @@ -225,6 +227,8 @@ public static ReplaceTableScanOpProc getReplaceTableScanProc(){ * new Aggregation Desc of the new GroupByOperator. */ private static class NewQueryGroupbySchemaProc implements NodeProcessor { + private RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null; + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { GroupByOperator operator = (GroupByOperator)nd; diff --git ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java index 0d0bf47..3d492a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java @@ -60,10 +60,14 @@ public static CommandProcessor get(String cmd, HiveConf conf) { return new Driver(); } - Driver drv = mapDrivers.get(conf); - if (drv == null) { - drv = new Driver(); - mapDrivers.put(conf, drv); + Driver drv = null; + + synchronized(mapDrivers) { + drv = mapDrivers.get(conf); + if (drv == null) { + drv = new Driver(); + mapDrivers.put(conf, drv); + } } drv.init(); return drv; @@ -73,11 +77,12 @@ public static CommandProcessor get(String cmd, HiveConf conf) { } public static void clean(HiveConf conf) { - Driver drv = mapDrivers.get(conf); - if (drv != null) { - drv.destroy(); + synchronized(mapDrivers) { + Driver drv = mapDrivers.get(conf); + if (drv != null) { + drv.destroy(); + } + mapDrivers.remove(conf); } - - mapDrivers.remove(conf); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java index e79cf9f..b6fd7e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java @@ -35,9 +35,9 @@ static final private Log LOG = LogFactory.getLog(StatsFactory.class.getName()); - private static Class publisherImplementation; - private static Class aggregatorImplementation; - private static Configuration jobConf; + private static ThreadLocal> publisherImplementation = new ThreadLocal>(); + private static ThreadLocal> aggregatorImplementation = new ThreadLocal>(); + private static ThreadLocal jobConf = new ThreadLocal(); /** * Sets the paths of the implementation classes of publishing @@ -51,11 +51,11 @@ public static boolean setImplementation(String configurationParam, Configuration if (configurationParam.equals(StatsSetupConst.HBASE_IMPL_CLASS_VAL)) { // Case: hbase try { - publisherImplementation = (Class) - Class.forName("org.apache.hadoop.hive.hbase.HBaseStatsPublisher", true, classLoader); + publisherImplementation.set((Class) + Class.forName("org.apache.hadoop.hive.hbase.HBaseStatsPublisher", true, classLoader)); - aggregatorImplementation = (Class) - Class.forName("org.apache.hadoop.hive.hbase.HBaseStatsAggregator", true, classLoader); + aggregatorImplementation.set((Class) + Class.forName("org.apache.hadoop.hive.hbase.HBaseStatsAggregator", true, classLoader)); } catch (ClassNotFoundException e) { LOG.error("HBase Publisher/Aggregator classes cannot be loaded.", e); return false; @@ -63,11 +63,11 @@ public static boolean setImplementation(String configurationParam, Configuration } else if (configurationParam.contains(StatsSetupConst.JDBC_IMPL_CLASS_VAL)) { // Case: jdbc:mysql or jdbc:derby try { - publisherImplementation = (Class) - Class.forName("org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsPublisher", true, classLoader); + publisherImplementation.set((Class) + Class.forName("org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsPublisher", true, classLoader)); - aggregatorImplementation = (Class) - Class.forName("org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsAggregator", true, classLoader); + aggregatorImplementation.set((Class) + Class.forName("org.apache.hadoop.hive.ql.stats.jdbc.JDBCStatsAggregator", true, classLoader)); } catch (ClassNotFoundException e) { LOG.error("JDBC Publisher/Aggregator classes cannot be loaded.", e); return false; @@ -81,17 +81,17 @@ public static boolean setImplementation(String configurationParam, Configuration return false; } try{ - publisherImplementation = (Class) - Class.forName(defPublisher, true, classLoader); - aggregatorImplementation = (Class) - Class.forName(defAggregator, true, classLoader); + publisherImplementation.set((Class) + Class.forName(defPublisher, true, classLoader)); + aggregatorImplementation.set((Class) + Class.forName(defAggregator, true, classLoader)); } catch (ClassNotFoundException e) { LOG.error("JDBC Publisher/Aggregator classes cannot be loaded.", e); return false; } } - jobConf = conf; + jobConf.set(conf); return true; } @@ -100,8 +100,7 @@ public static boolean setImplementation(String configurationParam, Configuration * For example HBaseStatsPublisher for the HBase implementation */ public static StatsPublisher getStatsPublisher() { - - return (StatsPublisher) ReflectionUtils.newInstance(publisherImplementation, jobConf); + return (StatsPublisher) ReflectionUtils.newInstance(publisherImplementation.get(), jobConf.get()); } /** @@ -109,8 +108,7 @@ public static StatsPublisher getStatsPublisher() { * For example HBaseStatsAggregator for the HBase implementation */ public static StatsAggregator getStatsAggregator() { - - return (StatsAggregator) ReflectionUtils.newInstance(aggregatorImplementation, jobConf); + return (StatsAggregator) ReflectionUtils.newInstance(aggregatorImplementation.get(), jobConf.get()); } }