diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java index b45a9de..1c3cad2 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.serde2.objectinspector; - import java.lang.reflect.GenericArrayType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; @@ -29,15 +28,15 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; -import com.google.common.cache.CacheBuilder; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.thrift.TUnion; -import java.util.concurrent.TimeUnit; + import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; /** * ObjectInspectorFactory is the primary way to create new ObjectInspector * instances. @@ -64,9 +63,30 @@ */ public enum ObjectInspectorOptions { JAVA, THRIFT, PROTOCOL_BUFFERS, AVRO - }; + } - static ConcurrentHashMap objectInspectorCache = new ConcurrentHashMap(); + // guava cache builder does not support generics, so reuse builder + private static CacheBuilder boundedBuilder = CacheBuilder.newBuilder() + .initialCapacity(1024) + .maximumSize(10240) // 10x initial capacity + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .expireAfterAccess(5, TimeUnit.MINUTES) + .softValues(); + + private static CacheBuilder unboundedBuilder = CacheBuilder.newBuilder() + .initialCapacity(1024) + .concurrencyLevel(Runtime.getRuntime().availableProcessors()) + .expireAfterAccess(5, TimeUnit.MINUTES) + .softValues(); + + // if this is made bounded (with eviction), type == type may not be oi == oi + static Cache objectInspectorCache = unboundedBuilder.build(); + static Cache, UnionStructObjectInspector> cachedUnionStructObjectInspector = boundedBuilder.build(); + static Cache cachedStandardListObjectInspector = boundedBuilder.build(); + static Cache, StandardMapObjectInspector> cachedStandardMapObjectInspector = boundedBuilder.build(); + static Cache, StandardUnionObjectInspector> cachedStandardUnionObjectInspector = boundedBuilder.build(); + static Cache>, StandardStructObjectInspector> cachedStandardStructObjectInspector = boundedBuilder.build(); + static Cache, ColumnarStructObjectInspector> cachedColumnarStructObjectInspector = boundedBuilder.build(); public static ObjectInspector getReflectionObjectInspector(Type t, ObjectInspectorOptions options) { @@ -75,10 +95,10 @@ public static ObjectInspector getReflectionObjectInspector(Type t, static ObjectInspector getReflectionObjectInspector(Type t, ObjectInspectorOptions options, boolean ensureInited) { - ObjectInspector oi = objectInspectorCache.get(t); + ObjectInspector oi = objectInspectorCache.asMap().get(t); if (oi == null) { oi = getReflectionObjectInspectorNoCache(t, options, ensureInited); - ObjectInspector prev = objectInspectorCache.putIfAbsent(t, oi); + ObjectInspector prev = objectInspectorCache.asMap().putIfAbsent(t, oi); if (prev != null) { oi = prev; } @@ -216,7 +236,7 @@ private static ObjectInspector getReflectionObjectInspectorNoCache(Type t, // put it into the cache BEFORE it is initialized to make sure we can catch // recursive types. ReflectionStructObjectInspector prev = - (ReflectionStructObjectInspector) objectInspectorCache.putIfAbsent(t, oi); + (ReflectionStructObjectInspector) objectInspectorCache.asMap().putIfAbsent(t, oi); if (prev != null) { oi = prev; } else { @@ -225,7 +245,7 @@ private static ObjectInspector getReflectionObjectInspectorNoCache(Type t, } finally { if (!oi.inited) { // Failed to init, remove it from cache - objectInspectorCache.remove(t, oi); + objectInspectorCache.asMap().remove(t, oi); } } } @@ -233,17 +253,14 @@ private static ObjectInspector getReflectionObjectInspectorNoCache(Type t, } - static ConcurrentHashMap - cachedStandardListObjectInspector = new ConcurrentHashMap(); - public static StandardListObjectInspector getStandardListObjectInspector( ObjectInspector listElementObjectInspector) { - StandardListObjectInspector result = cachedStandardListObjectInspector + StandardListObjectInspector result = cachedStandardListObjectInspector.asMap() .get(listElementObjectInspector); if (result == null) { result = new StandardListObjectInspector(listElementObjectInspector); StandardListObjectInspector prev = - cachedStandardListObjectInspector.putIfAbsent(listElementObjectInspector, result); + cachedStandardListObjectInspector.asMap().putIfAbsent(listElementObjectInspector, result); if (prev != null) { result = prev; } @@ -256,22 +273,19 @@ public static StandardConstantListObjectInspector getStandardConstantListObjectI return new StandardConstantListObjectInspector(listElementObjectInspector, constantValue); } - static ConcurrentHashMap, StandardMapObjectInspector> cachedStandardMapObjectInspector = - new ConcurrentHashMap, StandardMapObjectInspector>(); - public static StandardMapObjectInspector getStandardMapObjectInspector( ObjectInspector mapKeyObjectInspector, ObjectInspector mapValueObjectInspector) { ArrayList signature = new ArrayList(2); signature.add(mapKeyObjectInspector); signature.add(mapValueObjectInspector); - StandardMapObjectInspector result = cachedStandardMapObjectInspector + StandardMapObjectInspector result = cachedStandardMapObjectInspector.asMap() .get(signature); if (result == null) { result = new StandardMapObjectInspector(mapKeyObjectInspector, mapValueObjectInspector); StandardMapObjectInspector prev = - cachedStandardMapObjectInspector.putIfAbsent(signature, result); + cachedStandardMapObjectInspector.asMap().putIfAbsent(signature, result); if (prev != null) { result = prev; } @@ -287,18 +301,14 @@ public static StandardConstantMapObjectInspector getStandardConstantMapObjectIns mapValueObjectInspector, constantValue); } - static ConcurrentHashMap, StandardUnionObjectInspector> - cachedStandardUnionObjectInspector = - new ConcurrentHashMap, StandardUnionObjectInspector>(); - public static StandardUnionObjectInspector getStandardUnionObjectInspector( List unionObjectInspectors) { - StandardUnionObjectInspector result = cachedStandardUnionObjectInspector + StandardUnionObjectInspector result = cachedStandardUnionObjectInspector.asMap() .get(unionObjectInspectors); if (result == null) { result = new StandardUnionObjectInspector(unionObjectInspectors); StandardUnionObjectInspector prev = - cachedStandardUnionObjectInspector.putIfAbsent(unionObjectInspectors, result); + cachedStandardUnionObjectInspector.asMap().putIfAbsent(unionObjectInspectors, result); if (prev != null) { result = prev; } @@ -306,9 +316,6 @@ public static StandardUnionObjectInspector getStandardUnionObjectInspector( return result; } - static ConcurrentHashMap>, StandardStructObjectInspector> cachedStandardStructObjectInspector = - new ConcurrentHashMap>, StandardStructObjectInspector>(); - public static StandardStructObjectInspector getStandardStructObjectInspector( List structFieldNames, List structFieldObjectInspectors) { @@ -327,11 +334,11 @@ public static StandardStructObjectInspector getStandardStructObjectInspector( StringInternUtils.internStringsInList(structComments); signature.add(structComments); } - StandardStructObjectInspector result = cachedStandardStructObjectInspector.get(signature); + StandardStructObjectInspector result = cachedStandardStructObjectInspector.asMap().get(signature); if (result == null) { result = new StandardStructObjectInspector(structFieldNames, structFieldObjectInspectors, structComments); StandardStructObjectInspector prev = - cachedStandardStructObjectInspector.putIfAbsent(signature, result); + cachedStandardStructObjectInspector.asMap().putIfAbsent(signature, result); if (prev != null) { result = prev; } @@ -345,12 +352,6 @@ public static StandardConstantStructObjectInspector getStandardConstantStructObj return new StandardConstantStructObjectInspector(structFieldNames, structFieldObjectInspectors, value); } - static Cache, UnionStructObjectInspector> cachedUnionStructObjectInspector = CacheBuilder.newBuilder() - .initialCapacity(1024) - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .expireAfterAccess(5,TimeUnit.MINUTES) - .softValues() - .build(); public static UnionStructObjectInspector getUnionStructObjectInspector( List structObjectInspectors) { @@ -362,9 +363,6 @@ public static UnionStructObjectInspector getUnionStructObjectInspector( return result; } - static ConcurrentHashMap, ColumnarStructObjectInspector> cachedColumnarStructObjectInspector = - new ConcurrentHashMap, ColumnarStructObjectInspector>(); - public static ColumnarStructObjectInspector getColumnarStructObjectInspector( List structFieldNames, List structFieldObjectInspectors) { @@ -380,13 +378,13 @@ public static ColumnarStructObjectInspector getColumnarStructObjectInspector( if(structFieldComments != null) { signature.add(structFieldComments); } - ColumnarStructObjectInspector result = cachedColumnarStructObjectInspector + ColumnarStructObjectInspector result = cachedColumnarStructObjectInspector.asMap() .get(signature); if (result == null) { result = new ColumnarStructObjectInspector(structFieldNames, structFieldObjectInspectors, structFieldComments); ColumnarStructObjectInspector prev = - cachedColumnarStructObjectInspector.putIfAbsent(signature, result); + cachedColumnarStructObjectInspector.asMap().putIfAbsent(signature, result); if (prev != null) { result = prev; } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java index 3a88a40..d31cd3a 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java @@ -121,7 +121,7 @@ public String getTypeName() { */ protected boolean isFullyInited(Set checkedTypes) { if (type != null && // when type is not set, init hasn't been called yet - ObjectInspectorFactory.objectInspectorCache.get(type) != this) { + ObjectInspectorFactory.objectInspectorCache.asMap().get(type) != this) { // This object should be the same as in cache, otherwise, it must be removed due to init error throw new RuntimeException("Cached object inspector is gone while waiting for it to initialize"); } diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java index cb1440e..2faf340 100644 --- a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java +++ b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java @@ -31,8 +31,13 @@ import org.apache.commons.lang.mutable.MutableObject; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.thrift.test.Complex; +import org.junit.Test; + +import com.google.common.collect.Lists; import junit.framework.TestCase; @@ -42,6 +47,7 @@ */ public class TestReflectionObjectInspectors extends TestCase { + @Test public void testReflectionObjectInspectors() throws Throwable { try { ObjectInspector oi1 = ObjectInspectorFactory @@ -110,6 +116,24 @@ public void testReflectionObjectInspectors() throws Throwable { } } + @Test + public void testObjectInspectorMaxCacheSize() { + int maxSize = 10240; + for (int i = 0; i < maxSize; i++) { + ObjectInspectorFactory + .getStandardUnionObjectInspector(Lists.newArrayList(new JavaConstantStringObjectInspector("" + i))); + } + assertTrue("Got: " + ObjectInspectorFactory.cachedStandardUnionObjectInspector.size(), + ObjectInspectorFactory.cachedStandardUnionObjectInspector.size() <= maxSize); + for (int i = 0; i < 1000; i++) { + ObjectInspectorFactory.getStandardUnionObjectInspector(Lists.newArrayList(new + JavaConstantStringObjectInspector("" + (10240 + i)))); + } + assertTrue("Got: " + ObjectInspectorFactory.cachedStandardUnionObjectInspector.size(), ObjectInspectorFactory + .cachedStandardUnionObjectInspector.size() <= maxSize); + } + + @Test public void testObjectInspectorThreadSafety() throws InterruptedException { final int workerCount = 5; // 5 workers to run getReflectionObjectInspector concurrently final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(workerCount); @@ -129,7 +153,7 @@ public void run() { try { for (int i = 0; i < 20; i++) { // repeat 20 times for (final ObjectPair t: types) { - ObjectInspectorFactory.objectInspectorCache.clear(); + ObjectInspectorFactory.objectInspectorCache.asMap().clear(); for (int k = 0; k < workerCount; k++) { results[k] = executorService.schedule(new Callable() { @Override