commit feb0d7a38ec6637c67ccecd27c3a58b0e628ad69 Author: Misha Dmitriev Date: Wed Mar 1 13:20:49 2017 -0800 Intern Properties objects referenced from PartitionDesc to reduce memory pressure. When multiple concurrent Hive queries run, a separate copy of org.apache.hadoop.hive.ql.metadata.Partition and ql.plan.PartitionDesc is created for each table partition per each query instance. So when in my benchmark explained in HIVE-16079 we have 2000 partitions and 50 concurrent queries running over them, we end up, in the worst case, with 2000*50=100,000 instances of Partition and PartitionDesc in memory. These objects themselves collectively take just ~2% of memory. However, other data structures that each of them reference, take a lot more. In particular, Properties objects take more than 20% of memory. When we have 50 concurrent read-only queries, there are 50 identical copies of Properties per each partition. That's a huge waste of memory. This change introduces a new class that extends Properties, called CopyOnFirstWriteProperties. It utilizes a unique interned copy of Properties whenever possible. However, when one of the methods that modify properties is called, a copy is created. When this class is used, memory consumption by Properties falls from 20% to 5..6%. diff --git a/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java b/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..d4d078b99548b08e4304f58138cf3260bd970972 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java @@ -0,0 +1,344 @@ +package org.apache.hadoop.hive.common; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.Reader; +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Enumeration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * A special subclass of Properties, designed to save memory when many identical + * copies of Properties would otherwise be created. To achieve that, we use the + * 'interned' field, which points to the same Properties object for all instances + * of CopyOnFirstWriteProperties that were created with identical contents. + * However, as soon as any mutating method is called, contents are copied from + * the 'interned' properties into this instance. + */ +public class CopyOnFirstWriteProperties extends Properties { + + private Properties interned; + + private static Interner INTERNER = Interners.newWeakInterner(); + private static Field defaultsField; + static { + try { + defaultsField = Properties.class.getDeclaredField("defaults"); + defaultsField.setAccessible(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public CopyOnFirstWriteProperties(Properties p) { + setInterned(p); + } + + /************* Public API of java.util.Properties ************/ + + @Override + public String getProperty(String key) { + if (interned != null) return interned.getProperty(key); + else return super.getProperty(key); + } + + @Override + public String getProperty(String key, String defaultValue) { + if (interned != null) return interned.getProperty(key, defaultValue); + else return super.getProperty(key, defaultValue); + } + + @Override + public void list(PrintStream out) { + if (interned != null) interned.list(out); + else super.list(out); + } + + @Override + public void list(PrintWriter out) { + if (interned != null) interned.list(out); + else super.list(out); + } + + @Override + public synchronized void load(InputStream inStream) throws IOException { + if (interned != null) copyFromInternedToThis(); + super.load(inStream); + } + + @Override + public synchronized void load(Reader reader) throws IOException { + if (interned != null) copyFromInternedToThis(); + super.load(reader); + } + + @Override + public synchronized void loadFromXML(InputStream inStream) throws IOException { + if (interned != null) copyFromInternedToThis(); + super.loadFromXML(inStream); + } + + @Override + public Enumeration propertyNames() { + if (interned != null) return interned.propertyNames(); + else return super.propertyNames(); + } + + @Override + public synchronized Object setProperty(String key, String value) { + if (interned != null) copyFromInternedToThis(); + return super.setProperty(key, value); + } + + @Override + public void store(OutputStream out, String comments) throws IOException { + if (interned != null) interned.store(out, comments); + else super.store(out, comments); + } + + @Override + public void storeToXML(OutputStream os, String comment) throws IOException { + if (interned != null) interned.storeToXML(os, comment); + else super.storeToXML(os, comment); + } + + @Override + public void storeToXML(OutputStream os, String comment, String encoding) + throws IOException { + if (interned != null) interned.storeToXML(os, comment, encoding); + else super.storeToXML(os, comment, encoding); + } + + @Override + public Set stringPropertyNames() { + if (interned != null) return interned.stringPropertyNames(); + else return super.stringPropertyNames(); + } + + /************* Public API of java.util.Hashtable ************/ + + @Override + public synchronized void clear() { + if (interned != null) copyFromInternedToThis(); + super.clear(); + } + + @Override + public synchronized Object clone() { + if (interned != null) return new CopyOnFirstWriteProperties(interned); + else return super.clone(); + } + + @Override + public synchronized Object compute(Object key, BiFunction remappingFunction) { + if (interned != null) copyFromInternedToThis(); // We do this because if function returns null, + // the mapping for key is removed, i.e. the table is mutated. + return super.compute(key, remappingFunction); + } + + @Override + public synchronized Object computeIfAbsent(Object key, Function mappingFunction) { + if (interned != null) copyFromInternedToThis(); + return super.computeIfAbsent(key, mappingFunction); + } + + @Override + public synchronized Object computeIfPresent(Object key, BiFunction remappingFunction) { + if (interned != null) copyFromInternedToThis(); + return super.computeIfPresent(key, remappingFunction); + } + + @Override + public synchronized boolean contains(Object value) { + if (interned != null) return interned.contains(value); + else return super.contains(value); + } + + @Override + public synchronized boolean containsKey(Object key) { + if (interned != null) return interned.containsKey(key); + else return super.containsKey(key); + } + + @Override + public synchronized boolean containsValue(Object value) { + if (interned != null) return interned.containsValue(value); + else return super.containsValue(value); + } + + @Override + public synchronized Enumeration elements() { + if (interned != null) return interned.elements(); + else return super.elements(); + } + + @Override + public Set> entrySet() { + if (interned != null) return interned.entrySet(); + else return super.entrySet(); + } + + @Override + public synchronized boolean equals(Object o) { + if (interned != null) return interned.equals(o); + else return super.equals(o); + } + + @Override + public synchronized void forEach(BiConsumer action) { + if (interned != null) interned.forEach(action); + else super.forEach(action); + } + + @Override + public synchronized Object get(Object key) { + if (interned != null) return interned.get(key); + else return super.get(key); + } + + @Override + public synchronized Object getOrDefault(Object key, Object defaultValue) { + if (interned != null) return interned.getOrDefault(key, defaultValue); + else return super.getOrDefault(key, defaultValue); + } + + @Override + public synchronized int hashCode() { + if (interned != null) return interned.hashCode(); + else return super.hashCode(); + } + + @Override + public synchronized boolean isEmpty() { + if (interned != null) return interned.isEmpty(); + else return super.isEmpty(); + } + + @Override + public synchronized Enumeration keys() { + if (interned != null) return interned.keys(); + else return super.keys(); + } + + @Override + public Set keySet() { + if (interned != null) return interned.keySet(); + else return super.keySet(); + } + + @Override + public synchronized Object merge(Object key, Object value, BiFunction remappingFunction) { + if (interned != null) copyFromInternedToThis(); + return super.merge(key, value, remappingFunction); + } + + @Override + public synchronized Object put(Object key, Object value) { + if (interned != null) copyFromInternedToThis(); + return super.put(key, value); + } + + @Override + public synchronized void putAll(Map t) { + if (interned != null) copyFromInternedToThis(); + super.putAll(t); + } + + @Override + public synchronized Object putIfAbsent(Object key, Object value) { + if (interned != null) copyFromInternedToThis(); + return super.putIfAbsent(key, value); + } + + @Override + public synchronized Object remove(Object key) { + if (interned != null) copyFromInternedToThis(); + return super.remove(key); + } + + @Override + public synchronized boolean remove(Object key, Object value) { + if (interned != null) copyFromInternedToThis(); + return super.remove(key, value); + } + + @Override + public synchronized Object replace(Object key, Object value) { + if (interned != null) copyFromInternedToThis(); + return super.replace(key, value); + } + + @Override + public synchronized boolean replace(Object key, Object oldValue, Object newValue) { + if (interned != null) copyFromInternedToThis(); + return super.replace(key, oldValue, newValue); + } + + @Override + public synchronized void replaceAll(BiFunction function) { + if (interned != null) copyFromInternedToThis(); + super.replaceAll(function); + } + + @Override + public synchronized int size() { + if (interned != null) return interned.size(); + else return super.size(); + } + + @Override + public synchronized String toString() { + if (interned != null) return interned.toString(); + else return super.toString(); + } + + @Override + public Collection values() { + if (interned != null) return interned.values(); + else return super.values(); + } + + /************* Private implementation ************/ + + private void copyFromInternedToThis() { + for (Map.Entry e : interned.entrySet()) { + super.put(e.getKey(), e.getValue()); + } + try { + // Unfortunately, we cannot directly read a protected field of non-this object + this.defaults = (Properties) defaultsField.get(interned); + } catch (IllegalAccessException e) { // Shouldn't happen + throw new RuntimeException(e); + } + setInterned(null); + } + + public void setInterned(Properties p) { + if (p != null) { + this.interned = INTERNER.intern(p); + } else { + this.interned = p; + } + } + + // These methods are required by serialization + + public CopyOnFirstWriteProperties() { + } + + public Properties getInterned() { + return interned; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index 247d5890ea8131404b9543d22876ca4c052578e0..01a652db524c8a816d44c4ccf7ced75a6c41b9d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -33,10 +33,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -223,6 +225,7 @@ public Kryo create() { kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); kryo.register(Path.class, new PathSerializer()); kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); + kryo.register(CopyOnFirstWriteProperties.class, new CopyOnFirstWritePropertiesSerializer()); ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy( @@ -422,6 +425,33 @@ public void write(final Kryo kryo, final Output output, final List obj) { } /** + * CopyOnFirstWriteProperties needs a special serializer, since it extends Properties, + * which implements Map, so MapSerializer would be used for it by default. Yet it has + * the additional 'interned' field that the standard MapSerializer doesn't handle + * properly. But FieldSerializer doesn't work for it as well, because the Hashtable + * superclass declares most of its fields transient. + */ + private static class CopyOnFirstWritePropertiesSerializer extends + com.esotericsoftware.kryo.serializers.MapSerializer { + + @Override + public void write(Kryo kryo, Output output, Map map) { + super.write(kryo, output, map); + CopyOnFirstWriteProperties p = (CopyOnFirstWriteProperties) map; + Properties ip = p.getInterned(); + kryo.writeObjectOrNull(output, ip, Properties.class); + } + + @Override + public Map read(Kryo kryo, Input input, Class type) { + Map map = super.read(kryo, input, type); + Properties ip = kryo.readObjectOrNull(input, Properties.class); + ((CopyOnFirstWriteProperties) map).setInterned(ip); + return map; + } + } + + /** * Serializes the plan. * * @param plan The plan, such as QueryPlan, MapredWork, etc. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index d05c1c68fdb7296c0346d73967071da1ebe7bb72..68dcd0d4110f6f87c8171cef2d43fc992f0d0950 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -55,13 +56,7 @@ @Explain(displayName = "Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class PartitionDesc implements Serializable, Cloneable { - static { - STRING_INTERNER = Interners.newWeakInterner(); - CLASS_INTERNER = Interners.newWeakInterner(); - } - - private static final Interner STRING_INTERNER; - private static final Interner> CLASS_INTERNER; + private static final Interner> CLASS_INTERNER = Interners.newWeakInterner(); private TableDesc tableDesc; private LinkedHashMap partSpec; @@ -220,8 +215,12 @@ public Map getPropertiesExplain() { } public void setProperties(final Properties properties) { - internProperties(properties); - this.properties = properties; + if (properties instanceof CopyOnFirstWriteProperties) { + this.properties = properties; + } else { + internProperties(properties); + this.properties = new CopyOnFirstWriteProperties(properties); + } } private static TableDesc getTableDesc(Table table) { @@ -235,8 +234,7 @@ private static void internProperties(Properties properties) { String key = (String) keys.nextElement(); String oldValue = properties.getProperty(key); if (oldValue != null) { - String value = STRING_INTERNER.intern(oldValue); - properties.setProperty(key, value); + properties.setProperty(key, oldValue.intern()); } } } @@ -280,13 +278,7 @@ public PartitionDesc clone() { ret.inputFileFormatClass = inputFileFormatClass; ret.outputFileFormatClass = outputFileFormatClass; if (properties != null) { - Properties newProp = new Properties(); - Enumeration keysProp = properties.keys(); - while (keysProp.hasMoreElements()) { - Object key = keysProp.nextElement(); - newProp.put(key, properties.get(key)); - } - ret.setProperties(newProp); + ret.setProperties((Properties) properties.clone()); } ret.tableDesc = (TableDesc) tableDesc.clone(); // The partition spec is not present