diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java index 2c0e3c2..d5cc9a5 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/AccumuloPredicateHandler.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hive.accumulo.predicate.compare.PrimitiveComparison; import org.apache.hadoop.hive.accumulo.predicate.compare.StringCompare; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; @@ -358,7 +358,7 @@ public ExprNodeDesc getExpression(Configuration conf) { if (filteredExprSerialized == null) return null; - return Utilities.deserializeExpression(filteredExprSerialized); + return SerializationUtilities.deserializeExpression(filteredExprSerialized); } /** diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java index 15ccda7..88e4530 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/predicate/TestAccumuloPredicateHandler.java @@ -53,7 +53,7 @@ import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; import org.apache.hadoop.hive.accumulo.serde.TooManyAccumuloColumnsException; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -117,7 +117,7 @@ public void testGetRowIDSearchCondition() { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPEqual(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); List sConditions = handler.getSearchConditions(conf); @@ -134,7 +134,7 @@ public void testRangeEqual() throws SerDeException { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPEqual(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); Collection ranges = handler.getRanges(conf, columnMapper); @@ -157,7 +157,7 @@ public void testRangeGreaterThan() throws SerDeException { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPGreaterThan(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); Collection ranges = handler.getRanges(conf, columnMapper); @@ -182,7 +182,7 @@ public void rangeGreaterThanOrEqual() throws SerDeException { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPEqualOrGreaterThan(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); Collection ranges = handler.getRanges(conf, columnMapper); @@ -206,7 +206,7 @@ public void rangeLessThan() throws SerDeException { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPLessThan(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); Collection ranges = handler.getRanges(conf, columnMapper); @@ -231,7 +231,7 @@ public void rangeLessThanOrEqual() throws SerDeException { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPEqualOrLessThan(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); Collection ranges = handler.getRanges(conf, columnMapper); @@ -273,7 +273,7 @@ public void testDisjointRanges() throws SerDeException { ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPAnd(), bothFilters); - String filterExpr = Utilities.serializeExpression(both); + String filterExpr = SerializationUtilities.serializeExpression(both); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); Collection ranges = handler.getRanges(conf, columnMapper); @@ -309,7 +309,7 @@ public void testMultipleRanges() throws SerDeException { ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPAnd(), bothFilters); - String filterExpr = Utilities.serializeExpression(both); + String filterExpr = SerializationUtilities.serializeExpression(both); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); List ranges = handler.getRanges(conf, columnMapper); @@ -329,7 +329,7 @@ public void testPushdownTuple() throws SerDeException, NoSuchPrimitiveComparison ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPEqual(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); List sConditions = handler.getSearchConditions(conf); @@ -356,7 +356,7 @@ public void testPushdownColumnTypeNotSupported() throws SerDeException, ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPEqual(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); List sConditions = handler.getSearchConditions(conf); assertEquals(sConditions.size(), 1); @@ -375,7 +375,7 @@ public void testPushdownComparisonOptNotSupported() { ExprNodeGenericFuncDesc node = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPNotNull(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); List sConditions = handler.getSearchConditions(conf); assertEquals(sConditions.size(), 1); @@ -417,7 +417,7 @@ public void testIteratorIgnoreRowIDFields() { ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPAnd(), bothFilters); - String filterExpr = Utilities.serializeExpression(both); + String filterExpr = SerializationUtilities.serializeExpression(both); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); try { List iterators = handler.getIterators(conf, columnMapper); @@ -468,7 +468,7 @@ public void testIgnoreIteratorPushdown() throws TooManyAccumuloColumnsException ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPAnd(), bothFilters); - String filterExpr = Utilities.serializeExpression(both); + String filterExpr = SerializationUtilities.serializeExpression(both); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); conf.setBoolean(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, false); try { @@ -519,7 +519,7 @@ public void testCreateIteratorSettings() throws Exception { ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPAnd(), bothFilters); - String filterExpr = Utilities.serializeExpression(both); + String filterExpr = SerializationUtilities.serializeExpression(both); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); List iterators = handler.getIterators(conf, columnMapper); assertEquals(iterators.size(), 2); @@ -665,7 +665,7 @@ public void testRowRangeIntersection() throws SerDeException { ExprNodeGenericFuncDesc both = new ExprNodeGenericFuncDesc(TypeInfoFactory.stringTypeInfo, new GenericUDFOPAnd(), bothFilters); - String filterExpr = Utilities.serializeExpression(both); + String filterExpr = SerializationUtilities.serializeExpression(both); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); // Should make ['f', 'm\0') @@ -697,7 +697,7 @@ public void testRowRangeGeneration() throws SerDeException { new GenericUDFOPLessThan(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); // Should make (100, +inf) @@ -738,7 +738,7 @@ public void testBinaryRangeGeneration() throws Exception { new GenericUDFOPLessThan(), children); assertNotNull(node); - String filterExpr = Utilities.serializeExpression(node); + String filterExpr = SerializationUtilities.serializeExpression(node); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, filterExpr); // Should make (100, +inf) diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index b17714f..88d1865 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -20,12 +20,9 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -39,7 +36,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.hbase.ColumnMappings.ColumnMapping; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -74,6 +71,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler @@ -187,7 +186,7 @@ private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean Scan scan = new Scan(); String filterObjectSerialized = jobConf.get(TableScanDesc.FILTER_OBJECT_CONF_STR); if (filterObjectSerialized != null) { - HBaseScanRange range = Utilities.deserializeObject(filterObjectSerialized, + HBaseScanRange range = SerializationUtilities.deserializeObject(filterObjectSerialized, HBaseScanRange.class); try { range.setup(scan, jobConf); @@ -203,7 +202,7 @@ private Scan createFilterScan(JobConf jobConf, int iKey, int iTimestamp, boolean } ExprNodeGenericFuncDesc filterExpr = - Utilities.deserializeExpression(filterExprSerialized); + SerializationUtilities.deserializeExpression(filterExprSerialized); String keyColName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java index 41571fc..4ab497e 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java @@ -18,9 +18,15 @@ */ package org.apache.hive.hcatalog.api; -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ObjectPair; @@ -47,7 +53,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -68,13 +74,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * The HCatClientHMSImpl is the Hive Metastore client based implementation of @@ -584,7 +586,7 @@ private void dropPartitionsUsingExpressions(Table table, Map par ExprNodeGenericFuncDesc partitionExpression = new ExpressionBuilder(table, partitionSpec).build(); ObjectPair serializedPartitionExpression = new ObjectPair(partitionSpec.size(), - Utilities.serializeExpressionToKryo(partitionExpression)); + SerializationUtilities.serializeExpressionToKryo(partitionExpression)); hmsClient.dropPartitions(table.getDbName(), table.getTableName(), Arrays.asList(serializedPartitionExpression), deleteData && !isExternal(table), // Delete data? ifExists, // Fail if table doesn't exist? diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 6713a2f..5b2c8c2 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -57,6 +57,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -1352,7 +1353,7 @@ public int checkPlan(String tname, List> tasks) thr try { conf.set(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "javaXML"); for (Task plan : tasks) { - Utilities.serializePlan(plan, ofs, conf); + SerializationUtilities.serializePlan(plan, ofs, conf); } ofs.close(); fixXml4JDK7(outf.getPath()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index cab0fc8..fbc5ea4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -21,16 +21,12 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; @@ -68,6 +64,8 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hive.common.util.ReflectionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Map side Join operator implementation. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java index 6c11637..721fbaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java @@ -40,7 +40,7 @@ import org.antlr.runtime.CommonToken; import org.antlr.runtime.tree.BaseTree; import org.antlr.runtime.tree.CommonTree; -import org.apache.hadoop.hive.ql.exec.Utilities.EnumDelegate; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities.EnumDelegate; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java new file mode 100644 index 0000000..d5e946e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -0,0 +1,730 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.beans.DefaultPersistenceDelegate; +import java.beans.Encoder; +import java.beans.ExceptionListener; +import java.beans.Expression; +import java.beans.PersistenceDelegate; +import java.beans.Statement; +import java.beans.XMLDecoder; +import java.beans.XMLEncoder; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.net.URI; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.antlr.runtime.CommonToken; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.objenesis.strategy.StdInstantiatorStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.pool.KryoFactory; +import com.esotericsoftware.kryo.pool.KryoPool; +import com.esotericsoftware.kryo.serializers.FieldSerializer; + +/** + * Utilities related to serialization and deserialization. + */ +public class SerializationUtilities { + private static final String CLASS_NAME = SerializationUtilities.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + private static KryoFactory factory = new KryoFactory() { + public Kryo create() { + Kryo kryo = new Kryo(); + kryo.register(java.sql.Date.class, new SqlDateSerializer()); + kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); + kryo.register(Path.class, new PathSerializer()); + kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); + ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) + .setFallbackInstantiatorStrategy( + new StdInstantiatorStrategy()); + removeField(kryo, Operator.class, "colExprMap"); + removeField(kryo, AbstractOperatorDesc.class, "statistics"); + kryo.register(MapWork.class); + kryo.register(ReduceWork.class); + kryo.register(TableDesc.class); + kryo.register(UnionOperator.class); + kryo.register(FileSinkOperator.class); + kryo.register(HiveIgnoreKeyTextOutputFormat.class); + kryo.register(StandardConstantListObjectInspector.class); + kryo.register(StandardConstantMapObjectInspector.class); + kryo.register(StandardConstantStructObjectInspector.class); + kryo.register(SequenceFileInputFormat.class); + kryo.register(HiveSequenceFileOutputFormat.class); + kryo.register(SparkEdgeProperty.class); + kryo.register(SparkWork.class); + kryo.register(Pair.class); + return kryo; + } + }; + + // Bounded queue could be specified here but that will lead to blocking. + // ConcurrentLinkedQueue is unbounded and will release soft referenced kryo instances under + // memory pressure. + private static KryoPool kryoPool = new KryoPool.Builder(factory).softReferences().build(); + + /** + * By default, kryo pool uses ConcurrentLinkedQueue which is unbounded. To facilitate reuse of + * kryo object call releaseKryo() after done using the kryo instance. The class loader for the + * kryo instance will be set to current thread's context class loader. + * + * @return kryo instance + */ + public static Kryo borrowKryo() { + Kryo kryo = kryoPool.borrow(); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + return kryo; + } + + /** + * Release kryo instance back to the pool. + * + * @param kryo - kryo instance to be released + */ + public static void releaseKryo(Kryo kryo) { + kryoPool.release(kryo); + } + + private static void removeField(Kryo kryo, Class type, String fieldName) { + FieldSerializer fld = new FieldSerializer(kryo, type); + fld.removeField(fieldName); + kryo.register(type, fld); + } + + /** + * Kryo serializer for timestamp. + */ + private static class TimestampSerializer extends + com.esotericsoftware.kryo.Serializer { + + @Override + public Timestamp read(Kryo kryo, Input input, Class clazz) { + Timestamp ts = new Timestamp(input.readLong()); + ts.setNanos(input.readInt()); + return ts; + } + + @Override + public void write(Kryo kryo, Output output, Timestamp ts) { + output.writeLong(ts.getTime()); + output.writeInt(ts.getNanos()); + } + } + + /** + * Custom Kryo serializer for sql date, otherwise Kryo gets confused between + * java.sql.Date and java.util.Date while deserializing + */ + private static class SqlDateSerializer extends + com.esotericsoftware.kryo.Serializer { + + @Override + public java.sql.Date read(Kryo kryo, Input input, Class clazz) { + return new java.sql.Date(input.readLong()); + } + + @Override + public void write(Kryo kryo, Output output, java.sql.Date sqlDate) { + output.writeLong(sqlDate.getTime()); + } + } + + private static class PathSerializer extends com.esotericsoftware.kryo.Serializer { + + @Override + public void write(Kryo kryo, Output output, Path path) { + output.writeString(path.toUri().toString()); + } + + @Override + public Path read(Kryo kryo, Input input, Class type) { + return new Path(URI.create(input.readString())); + } + } + + /** + * A kryo {@link Serializer} for lists created via {@link Arrays#asList(Object...)}. + *

+ * Note: This serializer does not support cyclic references, so if one of the objects + * gets set the list as attribute this might cause an error during deserialization. + *

+ *

+ * This is from kryo-serializers package. Added explicitly to avoid classpath issues. + */ + private static class ArraysAsListSerializer + extends com.esotericsoftware.kryo.Serializer> { + + private Field _arrayField; + + public ArraysAsListSerializer() { + try { + _arrayField = Class.forName("java.util.Arrays$ArrayList").getDeclaredField("a"); + _arrayField.setAccessible(true); + } catch (final Exception e) { + throw new RuntimeException(e); + } + // Immutable causes #copy(obj) to return the original object + setImmutable(true); + } + + @Override + public List read(final Kryo kryo, final Input input, final Class> type) { + final int length = input.readInt(true); + Class componentType = kryo.readClass(input).getType(); + if (componentType.isPrimitive()) { + componentType = getPrimitiveWrapperClass(componentType); + } + try { + final Object items = Array.newInstance(componentType, length); + for (int i = 0; i < length; i++) { + Array.set(items, i, kryo.readClassAndObject(input)); + } + return Arrays.asList((Object[]) items); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void write(final Kryo kryo, final Output output, final List obj) { + try { + final Object[] array = (Object[]) _arrayField.get(obj); + output.writeInt(array.length, true); + final Class componentType = array.getClass().getComponentType(); + kryo.writeClass(output, componentType); + for (final Object item : array) { + kryo.writeClassAndObject(output, item); + } + } catch (final RuntimeException e) { + // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... + // handles SerializationException specifically (resizing the buffer)... + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private Class getPrimitiveWrapperClass(final Class c) { + if (c.isPrimitive()) { + if (c.equals(Long.TYPE)) { + return Long.class; + } else if (c.equals(Integer.TYPE)) { + return Integer.class; + } else if (c.equals(Double.TYPE)) { + return Double.class; + } else if (c.equals(Float.TYPE)) { + return Float.class; + } else if (c.equals(Boolean.TYPE)) { + return Boolean.class; + } else if (c.equals(Character.TYPE)) { + return Character.class; + } else if (c.equals(Short.TYPE)) { + return Short.class; + } else if (c.equals(Byte.TYPE)) { + return Byte.class; + } + } + return c; + } + } + + + /** + * Serializes the plan. + * + * @param plan The plan, such as QueryPlan, MapredWork, etc. + * @param out The stream to write to. + * @param conf to pick which serialization format is desired. + */ + public static void serializePlan(Object plan, OutputStream out, Configuration conf) { + serializePlan(plan, out, conf, false); + } + + public static void serializePlan(Kryo kryo, Object plan, OutputStream out, Configuration conf) { + serializePlan(kryo, plan, out, conf, false); + } + + private static void serializePlan(Object plan, OutputStream out, Configuration conf, + boolean cloningPlan) { + Kryo kryo = borrowKryo(); + try { + serializePlan(kryo, plan, out, conf, cloningPlan); + } finally { + releaseKryo(kryo); + } + } + + private static void serializePlan(Kryo kryo, Object plan, OutputStream out, Configuration conf, + boolean cloningPlan) { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN); + String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo"); + LOG.info("Serializing " + plan.getClass().getSimpleName() + " via " + serializationType); + if ("javaXML".equalsIgnoreCase(serializationType)) { + serializeObjectByJavaXML(plan, out); + } else { + if (cloningPlan) { + serializeObjectByKryo(kryo, plan, out); + } else { + serializeObjectByKryo(kryo, plan, out); + } + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN); + } + + /** + * Deserializes the plan. + * + * @param in The stream to read from. + * @param planClass class of plan + * @param conf configuration + * @return The plan, such as QueryPlan, MapredWork, etc. + */ + public static T deserializePlan(InputStream in, Class planClass, Configuration conf) { + return deserializePlan(in, planClass, conf, false); + } + + public static T deserializePlan(Kryo kryo, InputStream in, Class planClass, + Configuration conf) { + return deserializePlan(kryo, in, planClass, conf, false); + } + + private static T deserializePlan(InputStream in, Class planClass, Configuration conf, + boolean cloningPlan) { + Kryo kryo = borrowKryo(); + T result = null; + try { + result = deserializePlan(kryo, in, planClass, conf, cloningPlan); + } finally { + releaseKryo(kryo); + } + return result; + } + + private static T deserializePlan(Kryo kryo, InputStream in, Class planClass, + Configuration conf, boolean cloningPlan) { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN); + T plan; + String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo"); + LOG.info("Deserializing " + planClass.getSimpleName() + " via " + serializationType); + if ("javaXML".equalsIgnoreCase(serializationType)) { + plan = deserializeObjectByJavaXML(in); + } else { + if (cloningPlan) { + plan = deserializeObjectByKryo(kryo, in, planClass); + } else { + plan = deserializeObjectByKryo(kryo, in, planClass); + } + } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN); + return plan; + } + + /** + * Clones using the powers of XML. Do not use unless necessary. + * @param plan The plan. + * @return The clone. + */ + public static MapredWork clonePlan(MapredWork plan) { + // TODO: need proper clone. Meanwhile, let's at least keep this horror in one place + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN); + ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + Configuration conf = new HiveConf(); + serializePlan(plan, baos, conf, true); + MapredWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()), + MapredWork.class, conf, true); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN); + return newPlan; + } + + /** + * Clones using the powers of XML. Do not use unless necessary. + * @param plan The plan. + * @return The clone. + */ + public static BaseWork cloneBaseWork(BaseWork plan) { + PerfLogger perfLogger = SessionState.getPerfLogger(); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN); + ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + Configuration conf = new HiveConf(); + serializePlan(plan, baos, conf, true); + BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()), + plan.getClass(), conf, true); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN); + return newPlan; + } + + /** + * Serialize the object. This helper function mainly makes sure that enums, + * counters, etc are handled properly. + */ + private static void serializeObjectByJavaXML(Object plan, OutputStream out) { + XMLEncoder e = new XMLEncoder(out); + e.setExceptionListener(new ExceptionListener() { + @Override + public void exceptionThrown(Exception e) { + LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new RuntimeException("Cannot serialize object", e); + } + }); + // workaround for java 1.5 + e.setPersistenceDelegate(PlanUtils.ExpressionTypes.class, new EnumDelegate()); + e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); + e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); + e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); + + e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate()); + e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate()); + e.setPersistenceDelegate(CommonToken.class, new CommonTokenDelegate()); + e.setPersistenceDelegate(Path.class, new PathDelegate()); + + e.writeObject(plan); + e.close(); + } + + + /** + * Java 1.5 workaround. From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403 + */ + public static class EnumDelegate extends DefaultPersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(), + ((Enum) oldInstance).name()}); + } + + @Override + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + return oldInstance == newInstance; + } + } + + public static class MapDelegate extends DefaultPersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + Map oldMap = (Map) oldInstance; + HashMap newMap = new HashMap(oldMap); + return new Expression(newMap, HashMap.class, "new", new Object[] {}); + } + + @Override + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + return false; + } + + @Override + protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { + java.util.Collection oldO = (java.util.Collection) oldInstance; + java.util.Collection newO = (java.util.Collection) newInstance; + + if (newO.size() != 0) { + out.writeStatement(new Statement(oldInstance, "clear", new Object[] {})); + } + for (Iterator i = oldO.iterator(); i.hasNext();) { + out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()})); + } + } + } + + public static class SetDelegate extends DefaultPersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + Set oldSet = (Set) oldInstance; + HashSet newSet = new HashSet(oldSet); + return new Expression(newSet, HashSet.class, "new", new Object[] {}); + } + + @Override + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + return false; + } + + @Override + protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { + java.util.Collection oldO = (java.util.Collection) oldInstance; + java.util.Collection newO = (java.util.Collection) newInstance; + + if (newO.size() != 0) { + out.writeStatement(new Statement(oldInstance, "clear", new Object[] {})); + } + for (Iterator i = oldO.iterator(); i.hasNext();) { + out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()})); + } + } + + } + + public static class ListDelegate extends DefaultPersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + List oldList = (List) oldInstance; + ArrayList newList = new ArrayList(oldList); + return new Expression(newList, ArrayList.class, "new", new Object[] {}); + } + + @Override + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + return false; + } + + @Override + protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { + java.util.Collection oldO = (java.util.Collection) oldInstance; + java.util.Collection newO = (java.util.Collection) newInstance; + + if (newO.size() != 0) { + out.writeStatement(new Statement(oldInstance, "clear", new Object[] {})); + } + for (Iterator i = oldO.iterator(); i.hasNext();) { + out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()})); + } + } + + } + + /** + * DatePersistenceDelegate. Needed to serialize java.util.Date + * since it is not serialization friendly. + * Also works for java.sql.Date since it derives from java.util.Date. + */ + public static class DatePersistenceDelegate extends PersistenceDelegate { + + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + Date dateVal = (Date)oldInstance; + Object[] args = { dateVal.getTime() }; + return new Expression(dateVal, dateVal.getClass(), "new", args); + } + + @Override + protected boolean mutatesTo(Object oldInstance, Object newInstance) { + if (oldInstance == null || newInstance == null) { + return false; + } + return oldInstance.getClass() == newInstance.getClass(); + } + } + + /** + * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since + * it is not serialization friendly. + */ + public static class TimestampPersistenceDelegate extends DatePersistenceDelegate { + @Override + protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { + Timestamp ts = (Timestamp)oldInstance; + Object[] args = { ts.getNanos() }; + Statement stmt = new Statement(oldInstance, "setNanos", args); + out.writeStatement(stmt); + } + } + + /** + * Need to serialize org.antlr.runtime.CommonToken + */ + public static class CommonTokenDelegate extends PersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + CommonToken ct = (CommonToken)oldInstance; + Object[] args = {ct.getType(), ct.getText()}; + return new Expression(ct, ct.getClass(), "new", args); + } + } + + public static class PathDelegate extends PersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + Path p = (Path)oldInstance; + Object[] args = {p.toString()}; + return new Expression(p, p.getClass(), "new", args); + } + } + + /** + * @param plan Usually of type MapredWork, MapredLocalWork etc. + * @param out stream in which serialized plan is written into + */ + private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) { + Output output = new Output(out); + kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader()); + kryo.writeObject(output, plan); + output.close(); + } + + /** + * De-serialize an object. This helper function mainly makes sure that enums, + * counters, etc are handled properly. + */ + @SuppressWarnings("unchecked") + private static T deserializeObjectByJavaXML(InputStream in) { + XMLDecoder d = null; + try { + d = new XMLDecoder(in, null, null); + return (T) d.readObject(); + } finally { + if (null != d) { + d.close(); + } + } + } + + private static T deserializeObjectByKryo(Kryo kryo, InputStream in, Class clazz ) { + Input inp = new Input(in); + kryo.setClassLoader(Utilities.getSessionSpecifiedClassLoader()); + T t = kryo.readObject(inp,clazz); + inp.close(); + return t; + } + + public static List> cloneOperatorTree(Configuration conf, List> roots) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + serializePlan(roots, baos, conf, true); + @SuppressWarnings("unchecked") + List> result = + deserializePlan(new ByteArrayInputStream(baos.toByteArray()), + roots.getClass(), conf, true); + return result; + } + + /** + * Serializes expression via Kryo. + * @param expr Expression. + * @return Bytes. + */ + public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) { + return serializeObjectToKryo(expr); + } + + /** + * Deserializes expression from Kryo. + * @param bytes Bytes containing the expression. + * @return Expression; null if deserialization succeeded, but the result type is incorrect. + */ + public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) { + return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class); + } + + public static String serializeExpression(ExprNodeGenericFuncDesc expr) { + try { + return new String(Base64.encodeBase64(serializeExpressionToKryo(expr)), "UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + + public static ExprNodeGenericFuncDesc deserializeExpression(String s) { + byte[] bytes; + try { + bytes = Base64.decodeBase64(s.getBytes("UTF-8")); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + return deserializeExpressionFromKryo(bytes); + } + + private static byte[] serializeObjectToKryo(Serializable object) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + Kryo kryo = borrowKryo(); + try { + kryo.writeObject(output, object); + } finally { + releaseKryo(kryo); + } + output.close(); + return baos.toByteArray(); + } + + private static T deserializeObjectFromKryo(byte[] bytes, Class clazz) { + Input inp = new Input(new ByteArrayInputStream(bytes)); + Kryo kryo = borrowKryo(); + T func = null; + try { + func = kryo.readObject(inp, clazz); + } finally { + releaseKryo(kryo); + } + inp.close(); + return func; + } + + public static String serializeObject(Serializable expr) { + try { + return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8"); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + + public static T deserializeObject(String s, Class clazz) { + try { + return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz); + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("UTF-8 support required", ex); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index dacb80f..c01994f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -20,12 +20,8 @@ import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; -import java.beans.ExceptionListener; import java.beans.Expression; -import java.beans.PersistenceDelegate; import java.beans.Statement; -import java.beans.XMLDecoder; -import java.beans.XMLEncoder; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; @@ -36,29 +32,22 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Array; -import java.lang.reflect.Field; import java.net.URI; import java.net.URL; import java.net.URLClassLoader; import java.net.URLDecoder; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLTransientException; -import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -83,12 +72,10 @@ import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -import org.antlr.runtime.CommonToken; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -150,22 +137,16 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes; import org.apache.hadoop.hive.ql.plan.ReduceWork; -import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; -import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.Adjacency; @@ -181,9 +162,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -209,14 +187,10 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Shell; import org.apache.hive.common.util.ReflectionUtil; -import org.objenesis.strategy.StdInstantiatorStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.FieldSerializer; import com.google.common.base.Preconditions; /** @@ -391,6 +365,7 @@ public static void setBaseWork(Configuration conf, String name, BaseWork work) { private static BaseWork getBaseWork(Configuration conf, String name) { Path path = null; InputStream in = null; + Kryo kryo = SerializationUtilities.borrowKryo(); try { String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE); if (engine.equals("spark")) { @@ -401,7 +376,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { ClassLoader loader = Thread.currentThread().getContextClassLoader(); ClassLoader newLoader = addToClassPath(loader, addedJars.split(";")); Thread.currentThread().setContextClassLoader(newLoader); - runtimeSerializationKryo.get().setClassLoader(newLoader); + kryo.setClassLoader(newLoader); } } @@ -410,16 +385,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { assert path != null; BaseWork gWork = gWorkMap.get(conf).get(path); if (gWork == null) { - Path localPath; - if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) { - localPath = new Path(name); - } else if (ShimLoader.getHadoopShims().isLocalMode(conf)) { - localPath = path; - } else { - LOG.debug("***************non-local mode***************"); - localPath = new Path(name); - } - localPath = path; + Path localPath = path; LOG.debug("local path = " + localPath); if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { LOG.debug("Loading plan from string: "+path.toUri().getPath()); @@ -438,29 +404,29 @@ private static BaseWork getBaseWork(Configuration conf, String name) { if(MAP_PLAN_NAME.equals(name)){ if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ - gWork = deserializePlan(in, MapWork.class, conf); + gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class, conf); } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = deserializePlan(in, MergeFileWork.class, conf); + gWork = SerializationUtilities.deserializePlan(kryo, in, MergeFileWork.class, conf); } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = deserializePlan(in, ColumnTruncateWork.class, conf); + gWork = SerializationUtilities.deserializePlan(kryo, in, ColumnTruncateWork.class, conf); } else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { - gWork = deserializePlan(in, PartialScanWork.class,conf); + gWork = SerializationUtilities.deserializePlan(kryo, in, PartialScanWork.class,conf); } else { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)) ; } } else if (REDUCE_PLAN_NAME.equals(name)) { if(ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) { - gWork = deserializePlan(in, ReduceWork.class, conf); + gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class, conf); } else { throw new RuntimeException("unable to determine work from configuration ." + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ; } } else if (name.contains(MERGE_PLAN_NAME)) { if (name.startsWith(MAPNAME)) { - gWork = deserializePlan(in, MapWork.class, conf); + gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class, conf); } else if (name.startsWith(REDUCENAME)) { - gWork = deserializePlan(in, ReduceWork.class, conf); + gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class, conf); } else { throw new RuntimeException("Unknown work type: " + name); } @@ -480,6 +446,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { LOG.error(msg, e); throw new RuntimeException(msg, e); } finally { + SerializationUtilities.releaseKryo(kryo); if (in != null) { try { in.close(); @@ -523,163 +490,6 @@ public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { return ret; } - /** - * Java 1.5 workaround. From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403 - */ - public static class EnumDelegate extends DefaultPersistenceDelegate { - @Override - protected Expression instantiate(Object oldInstance, Encoder out) { - return new Expression(Enum.class, "valueOf", new Object[] {oldInstance.getClass(), - ((Enum) oldInstance).name()}); - } - - @Override - protected boolean mutatesTo(Object oldInstance, Object newInstance) { - return oldInstance == newInstance; - } - } - - public static class MapDelegate extends DefaultPersistenceDelegate { - @Override - protected Expression instantiate(Object oldInstance, Encoder out) { - Map oldMap = (Map) oldInstance; - HashMap newMap = new HashMap(oldMap); - return new Expression(newMap, HashMap.class, "new", new Object[] {}); - } - - @Override - protected boolean mutatesTo(Object oldInstance, Object newInstance) { - return false; - } - - @Override - protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { - java.util.Collection oldO = (java.util.Collection) oldInstance; - java.util.Collection newO = (java.util.Collection) newInstance; - - if (newO.size() != 0) { - out.writeStatement(new Statement(oldInstance, "clear", new Object[] {})); - } - for (Iterator i = oldO.iterator(); i.hasNext();) { - out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()})); - } - } - } - - public static class SetDelegate extends DefaultPersistenceDelegate { - @Override - protected Expression instantiate(Object oldInstance, Encoder out) { - Set oldSet = (Set) oldInstance; - HashSet newSet = new HashSet(oldSet); - return new Expression(newSet, HashSet.class, "new", new Object[] {}); - } - - @Override - protected boolean mutatesTo(Object oldInstance, Object newInstance) { - return false; - } - - @Override - protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { - java.util.Collection oldO = (java.util.Collection) oldInstance; - java.util.Collection newO = (java.util.Collection) newInstance; - - if (newO.size() != 0) { - out.writeStatement(new Statement(oldInstance, "clear", new Object[] {})); - } - for (Iterator i = oldO.iterator(); i.hasNext();) { - out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()})); - } - } - - } - - public static class ListDelegate extends DefaultPersistenceDelegate { - @Override - protected Expression instantiate(Object oldInstance, Encoder out) { - List oldList = (List) oldInstance; - ArrayList newList = new ArrayList(oldList); - return new Expression(newList, ArrayList.class, "new", new Object[] {}); - } - - @Override - protected boolean mutatesTo(Object oldInstance, Object newInstance) { - return false; - } - - @Override - protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { - java.util.Collection oldO = (java.util.Collection) oldInstance; - java.util.Collection newO = (java.util.Collection) newInstance; - - if (newO.size() != 0) { - out.writeStatement(new Statement(oldInstance, "clear", new Object[] {})); - } - for (Iterator i = oldO.iterator(); i.hasNext();) { - out.writeStatement(new Statement(oldInstance, "add", new Object[] {i.next()})); - } - } - - } - - /** - * DatePersistenceDelegate. Needed to serialize java.util.Date - * since it is not serialization friendly. - * Also works for java.sql.Date since it derives from java.util.Date. - */ - public static class DatePersistenceDelegate extends PersistenceDelegate { - - @Override - protected Expression instantiate(Object oldInstance, Encoder out) { - Date dateVal = (Date)oldInstance; - Object[] args = { dateVal.getTime() }; - return new Expression(dateVal, dateVal.getClass(), "new", args); - } - - @Override - protected boolean mutatesTo(Object oldInstance, Object newInstance) { - if (oldInstance == null || newInstance == null) { - return false; - } - return oldInstance.getClass() == newInstance.getClass(); - } - } - - /** - * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since - * it is not serialization friendly. - */ - public static class TimestampPersistenceDelegate extends DatePersistenceDelegate { - @Override - protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { - Timestamp ts = (Timestamp)oldInstance; - Object[] args = { ts.getNanos() }; - Statement stmt = new Statement(oldInstance, "setNanos", args); - out.writeStatement(stmt); - } - } - - /** - * Need to serialize org.antlr.runtime.CommonToken - */ - public static class CommonTokenDelegate extends PersistenceDelegate { - @Override - protected Expression instantiate(Object oldInstance, Encoder out) { - CommonToken ct = (CommonToken)oldInstance; - Object[] args = {ct.getType(), ct.getText()}; - return new Expression(ct, ct.getClass(), "new", args); - } - } - - public static class PathDelegate extends PersistenceDelegate { - @Override - protected Expression instantiate(Object oldInstance, Encoder out) { - Path p = (Path)oldInstance; - Object[] args = {p.toString()}; - return new Expression(p, p.getClass(), "new", args); - } - } - public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) { String useName = conf.get(INPUT_NAME); if (useName == null) { @@ -702,6 +512,7 @@ public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScra } private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { + Kryo kryo = SerializationUtilities.borrowKryo(); try { setPlanPath(conf, hiveScratchDir); @@ -714,7 +525,7 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); try { out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED)); - serializePlan(w, out, conf); + SerializationUtilities.serializePlan(kryo, w, out, conf); out.close(); out = null; } finally { @@ -728,7 +539,7 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch FileSystem fs = planPath.getFileSystem(conf); try { out = fs.create(planPath); - serializePlan(w, out, conf); + SerializationUtilities.serializePlan(kryo, w, out, conf); out.close(); out = null; } finally { @@ -760,6 +571,8 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch String msg = "Error caching " + name + ": " + e; LOG.error(msg, e); throw new RuntimeException(msg, e); + } finally { + SerializationUtilities.releaseKryo(kryo); } } @@ -790,73 +603,6 @@ public static Path getPlanPath(Configuration conf) { return null; } - /** - * Serializes expression via Kryo. - * @param expr Expression. - * @return Bytes. - */ - public static byte[] serializeExpressionToKryo(ExprNodeGenericFuncDesc expr) { - return serializeObjectToKryo(expr); - } - - /** - * Deserializes expression from Kryo. - * @param bytes Bytes containing the expression. - * @return Expression; null if deserialization succeeded, but the result type is incorrect. - */ - public static ExprNodeGenericFuncDesc deserializeExpressionFromKryo(byte[] bytes) { - return deserializeObjectFromKryo(bytes, ExprNodeGenericFuncDesc.class); - } - - public static String serializeExpression(ExprNodeGenericFuncDesc expr) { - try { - return new String(Base64.encodeBase64(serializeExpressionToKryo(expr)), "UTF-8"); - } catch (UnsupportedEncodingException ex) { - throw new RuntimeException("UTF-8 support required", ex); - } - } - - public static ExprNodeGenericFuncDesc deserializeExpression(String s) { - byte[] bytes; - try { - bytes = Base64.decodeBase64(s.getBytes("UTF-8")); - } catch (UnsupportedEncodingException ex) { - throw new RuntimeException("UTF-8 support required", ex); - } - return deserializeExpressionFromKryo(bytes); - } - - private static byte[] serializeObjectToKryo(Serializable object) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Output output = new Output(baos); - runtimeSerializationKryo.get().writeObject(output, object); - output.close(); - return baos.toByteArray(); - } - - private static T deserializeObjectFromKryo(byte[] bytes, Class clazz) { - Input inp = new Input(new ByteArrayInputStream(bytes)); - T func = runtimeSerializationKryo.get().readObject(inp, clazz); - inp.close(); - return func; - } - - public static String serializeObject(Serializable expr) { - try { - return new String(Base64.encodeBase64(serializeObjectToKryo(expr)), "UTF-8"); - } catch (UnsupportedEncodingException ex) { - throw new RuntimeException("UTF-8 support required", ex); - } - } - - public static T deserializeObject(String s, Class clazz) { - try { - return deserializeObjectFromKryo(Base64.decodeBase64(s.getBytes("UTF-8")), clazz); - } catch (UnsupportedEncodingException ex) { - throw new RuntimeException("UTF-8 support required", ex); - } - } - public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate { @Override protected Expression instantiate(Object oldInstance, Encoder out) { @@ -872,415 +618,6 @@ protected void initialize(Class type, Object oldInstance, Object newInstance, En } } - /** - * Kryo serializer for timestamp. - */ - private static class TimestampSerializer extends - com.esotericsoftware.kryo.Serializer { - - @Override - public Timestamp read(Kryo kryo, Input input, Class clazz) { - Timestamp ts = new Timestamp(input.readLong()); - ts.setNanos(input.readInt()); - return ts; - } - - @Override - public void write(Kryo kryo, Output output, Timestamp ts) { - output.writeLong(ts.getTime()); - output.writeInt(ts.getNanos()); - } - } - - /** Custom Kryo serializer for sql date, otherwise Kryo gets confused between - java.sql.Date and java.util.Date while deserializing - */ - private static class SqlDateSerializer extends - com.esotericsoftware.kryo.Serializer { - - @Override - public java.sql.Date read(Kryo kryo, Input input, Class clazz) { - return new java.sql.Date(input.readLong()); - } - - @Override - public void write(Kryo kryo, Output output, java.sql.Date sqlDate) { - output.writeLong(sqlDate.getTime()); - } - } - - private static class CommonTokenSerializer extends com.esotericsoftware.kryo.Serializer { - @Override - public CommonToken read(Kryo kryo, Input input, Class clazz) { - return new CommonToken(input.readInt(), input.readString()); - } - - @Override - public void write(Kryo kryo, Output output, CommonToken token) { - output.writeInt(token.getType()); - output.writeString(token.getText()); - } - } - - private static class PathSerializer extends com.esotericsoftware.kryo.Serializer { - - @Override - public void write(Kryo kryo, Output output, Path path) { - output.writeString(path.toUri().toString()); - } - - @Override - public Path read(Kryo kryo, Input input, Class type) { - return new Path(URI.create(input.readString())); - } - } - - public static List> cloneOperatorTree(Configuration conf, List> roots) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); - serializePlan(roots, baos, conf, true); - @SuppressWarnings("unchecked") - List> result = - deserializePlan(new ByteArrayInputStream(baos.toByteArray()), - roots.getClass(), conf, true); - return result; - } - - private static void serializePlan(Object plan, OutputStream out, Configuration conf, boolean cloningPlan) { - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SERIALIZE_PLAN); - String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo"); - LOG.info("Serializing " + plan.getClass().getSimpleName() + " via " + serializationType); - if("javaXML".equalsIgnoreCase(serializationType)) { - serializeObjectByJavaXML(plan, out); - } else { - if(cloningPlan) { - serializeObjectByKryo(cloningQueryPlanKryo.get(), plan, out); - } else { - serializeObjectByKryo(runtimeSerializationKryo.get(), plan, out); - } - } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SERIALIZE_PLAN); - } - /** - * Serializes the plan. - * @param plan The plan, such as QueryPlan, MapredWork, etc. - * @param out The stream to write to. - * @param conf to pick which serialization format is desired. - */ - public static void serializePlan(Object plan, OutputStream out, Configuration conf) { - serializePlan(plan, out, conf, false); - } - - private static T deserializePlan(InputStream in, Class planClass, Configuration conf, boolean cloningPlan) { - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN); - T plan; - String serializationType = conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo"); - LOG.info("Deserializing " + planClass.getSimpleName() + " via " + serializationType); - if("javaXML".equalsIgnoreCase(serializationType)) { - plan = deserializeObjectByJavaXML(in); - } else { - if(cloningPlan) { - plan = deserializeObjectByKryo(cloningQueryPlanKryo.get(), in, planClass); - } else { - plan = deserializeObjectByKryo(runtimeSerializationKryo.get(), in, planClass); - } - } - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DESERIALIZE_PLAN); - return plan; - } - /** - * Deserializes the plan. - * @param in The stream to read from. - * @param planClass class of plan - * @param conf configuration - * @return The plan, such as QueryPlan, MapredWork, etc. - */ - public static T deserializePlan(InputStream in, Class planClass, Configuration conf) { - return deserializePlan(in, planClass, conf, false); - } - - /** - * Clones using the powers of XML. Do not use unless necessary. - * @param plan The plan. - * @return The clone. - */ - public static MapredWork clonePlan(MapredWork plan) { - // TODO: need proper clone. Meanwhile, let's at least keep this horror in one place - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN); - ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); - Configuration conf = new HiveConf(); - serializePlan(plan, baos, conf, true); - MapredWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()), - MapredWork.class, conf, true); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN); - return newPlan; - } - - /** - * Clones using the powers of XML. Do not use unless necessary. - * @param plan The plan. - * @return The clone. - */ - public static BaseWork cloneBaseWork(BaseWork plan) { - PerfLogger perfLogger = SessionState.getPerfLogger(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN); - ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); - Configuration conf = new HiveConf(); - serializePlan(plan, baos, conf, true); - BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()), - plan.getClass(), conf, true); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN); - return newPlan; - } - - /** - * Serialize the object. This helper function mainly makes sure that enums, - * counters, etc are handled properly. - */ - private static void serializeObjectByJavaXML(Object plan, OutputStream out) { - XMLEncoder e = new XMLEncoder(out); - e.setExceptionListener(new ExceptionListener() { - @Override - public void exceptionThrown(Exception e) { - LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e)); - throw new RuntimeException("Cannot serialize object", e); - } - }); - // workaround for java 1.5 - e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); - e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); - e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); - e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); - - e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate()); - e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate()); - e.setPersistenceDelegate(CommonToken.class, new CommonTokenDelegate()); - e.setPersistenceDelegate(Path.class, new PathDelegate()); - - e.writeObject(plan); - e.close(); - } - - /** - * @param plan Usually of type MapredWork, MapredLocalWork etc. - * @param out stream in which serialized plan is written into - */ - private static void serializeObjectByKryo(Kryo kryo, Object plan, OutputStream out) { - Output output = new Output(out); - kryo.setClassLoader(getSessionSpecifiedClassLoader()); - kryo.writeObject(output, plan); - output.close(); - } - - /** - * De-serialize an object. This helper function mainly makes sure that enums, - * counters, etc are handled properly. - */ - @SuppressWarnings("unchecked") - private static T deserializeObjectByJavaXML(InputStream in) { - XMLDecoder d = null; - try { - d = new XMLDecoder(in, null, null); - return (T) d.readObject(); - } finally { - if (null != d) { - d.close(); - } - } - } - - private static T deserializeObjectByKryo(Kryo kryo, InputStream in, Class clazz ) { - Input inp = new Input(in); - kryo.setClassLoader(getSessionSpecifiedClassLoader()); - T t = kryo.readObject(inp,clazz); - inp.close(); - return t; - } - - // Kryo is not thread-safe, - // Also new Kryo() is expensive, so we want to do it just once. - public static ThreadLocal - runtimeSerializationKryo = new ThreadLocal() { - @Override - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); - kryo.register(java.sql.Date.class, new SqlDateSerializer()); - kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); - kryo.register(Path.class, new PathSerializer()); - kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() ); - ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy( - new StdInstantiatorStrategy()); - removeField(kryo, Operator.class, "colExprMap"); - removeField(kryo, AbstractOperatorDesc.class, "statistics"); - kryo.register(MapWork.class); - kryo.register(ReduceWork.class); - kryo.register(TableDesc.class); - kryo.register(UnionOperator.class); - kryo.register(FileSinkOperator.class); - kryo.register(HiveIgnoreKeyTextOutputFormat.class); - kryo.register(StandardConstantListObjectInspector.class); - kryo.register(StandardConstantMapObjectInspector.class); - kryo.register(StandardConstantStructObjectInspector.class); - kryo.register(SequenceFileInputFormat.class); - kryo.register(HiveSequenceFileOutputFormat.class); - return kryo; - }; - }; - @SuppressWarnings("rawtypes") - protected static void removeField(Kryo kryo, Class type, String fieldName) { - FieldSerializer fld = new FieldSerializer(kryo, type); - fld.removeField(fieldName); - kryo.register(type, fld); - } - - public static ThreadLocal sparkSerializationKryo = new ThreadLocal() { - @Override - protected synchronized Kryo initialValue() { - Kryo kryo = new Kryo(); - kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); - kryo.register(java.sql.Date.class, new SqlDateSerializer()); - kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); - kryo.register(Path.class, new PathSerializer()); - kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() ); - ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - removeField(kryo, Operator.class, "colExprMap"); - removeField(kryo, ColumnInfo.class, "objectInspector"); - removeField(kryo, AbstractOperatorDesc.class, "statistics"); - kryo.register(SparkEdgeProperty.class); - kryo.register(MapWork.class); - kryo.register(ReduceWork.class); - kryo.register(SparkWork.class); - kryo.register(TableDesc.class); - kryo.register(Pair.class); - kryo.register(UnionOperator.class); - kryo.register(FileSinkOperator.class); - kryo.register(HiveIgnoreKeyTextOutputFormat.class); - kryo.register(StandardConstantListObjectInspector.class); - kryo.register(StandardConstantMapObjectInspector.class); - kryo.register(StandardConstantStructObjectInspector.class); - kryo.register(SequenceFileInputFormat.class); - kryo.register(HiveSequenceFileOutputFormat.class); - return kryo; - }; - }; - - private static ThreadLocal cloningQueryPlanKryo = new ThreadLocal() { - @Override - protected Kryo initialValue() { - Kryo kryo = new Kryo(); - kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); - kryo.register(CommonToken.class, new CommonTokenSerializer()); - kryo.register(java.sql.Date.class, new SqlDateSerializer()); - kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); - kryo.register(Path.class, new PathSerializer()); - kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() ); - ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy( - new StdInstantiatorStrategy()); - removeField(kryo, Operator.class, "colExprMap"); - removeField(kryo, AbstractOperatorDesc.class, "statistics"); - kryo.register(MapWork.class); - kryo.register(ReduceWork.class); - kryo.register(TableDesc.class); - kryo.register(UnionOperator.class); - kryo.register(FileSinkOperator.class); - kryo.register(HiveIgnoreKeyTextOutputFormat.class); - kryo.register(StandardConstantListObjectInspector.class); - kryo.register(StandardConstantMapObjectInspector.class); - kryo.register(StandardConstantStructObjectInspector.class); - kryo.register(SequenceFileInputFormat.class); - kryo.register(HiveSequenceFileOutputFormat.class); - return kryo; - }; - }; - - /** - * A kryo {@link Serializer} for lists created via {@link Arrays#asList(Object...)}. - *

- * Note: This serializer does not support cyclic references, so if one of the objects - * gets set the list as attribute this might cause an error during deserialization. - *

- * - * This is from kryo-serializers package. Added explicitly to avoid classpath issues. - */ - private static class ArraysAsListSerializer extends com.esotericsoftware.kryo.Serializer> { - - private Field _arrayField; - - public ArraysAsListSerializer() { - try { - _arrayField = Class.forName( "java.util.Arrays$ArrayList" ).getDeclaredField( "a" ); - _arrayField.setAccessible( true ); - } catch ( final Exception e ) { - throw new RuntimeException( e ); - } - // Immutable causes #copy(obj) to return the original object - setImmutable(true); - } - - @Override - public List read(final Kryo kryo, final Input input, final Class> type) { - final int length = input.readInt(true); - Class componentType = kryo.readClass( input ).getType(); - if (componentType.isPrimitive()) { - componentType = getPrimitiveWrapperClass(componentType); - } - try { - final Object items = Array.newInstance( componentType, length ); - for( int i = 0; i < length; i++ ) { - Array.set(items, i, kryo.readClassAndObject( input )); - } - return Arrays.asList( (Object[])items ); - } catch ( final Exception e ) { - throw new RuntimeException( e ); - } - } - - @Override - public void write(final Kryo kryo, final Output output, final List obj) { - try { - final Object[] array = (Object[]) _arrayField.get( obj ); - output.writeInt(array.length, true); - final Class componentType = array.getClass().getComponentType(); - kryo.writeClass( output, componentType ); - for( final Object item : array ) { - kryo.writeClassAndObject( output, item ); - } - } catch ( final RuntimeException e ) { - // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... - // handles SerializationException specifically (resizing the buffer)... - throw e; - } catch ( final Exception e ) { - throw new RuntimeException( e ); - } - } - - private Class getPrimitiveWrapperClass(final Class c) { - if (c.isPrimitive()) { - if (c.equals(Long.TYPE)) { - return Long.class; - } else if (c.equals(Integer.TYPE)) { - return Integer.class; - } else if (c.equals(Double.TYPE)) { - return Double.class; - } else if (c.equals(Float.TYPE)) { - return Float.class; - } else if (c.equals(Boolean.TYPE)) { - return Boolean.class; - } else if (c.equals(Character.TYPE)) { - return Character.class; - } else if (c.equals(Short.TYPE)) { - return Short.class; - } else if (c.equals(Byte.TYPE)) { - return Byte.class; - } - } - return c; - } - } - public static TableDesc defaultTd; static { // by default we expect ^A separated strings diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index df96d8c..2129bda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -31,6 +31,7 @@ import java.util.Properties; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.mapreduce.MRJobConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -740,12 +741,13 @@ public static void main(String[] args) throws IOException, HiveException { int ret; if (localtask) { memoryMXBean = ManagementFactory.getMemoryMXBean(); - MapredLocalWork plan = Utilities.deserializePlan(pathData, MapredLocalWork.class, conf); + MapredLocalWork plan = SerializationUtilities.deserializePlan(pathData, MapredLocalWork.class, + conf); MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent); ret = ed.executeInProcess(new DriverContext()); } else { - MapredWork plan = Utilities.deserializePlan(pathData, MapredWork.class, conf); + MapredWork plan = SerializationUtilities.deserializePlan(pathData, MapredWork.class, conf); ExecDriver ed = new ExecDriver(plan, conf, isSilent); ret = ed.execute(new DriverContext()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 058d63d..cb70ac8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -177,7 +178,7 @@ public int execute(DriverContext driverContext) { OutputStream out = null; try { out = FileSystem.getLocal(conf).create(planPath); - Utilities.serializePlan(plan, out, conf); + SerializationUtilities.serializePlan(plan, out, conf); out.close(); out = null; } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index bfe21db..cb7dfa1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -33,6 +33,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -159,7 +160,7 @@ public int executeInChildVM(DriverContext driverContext) { OutputStream out = null; try { out = FileSystem.getLocal(conf).create(planPath); - Utilities.serializePlan(plan, out, conf); + SerializationUtilities.serializePlan(plan, out, conf); out.close(); out = null; } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index a0c9b98..f2f3c09 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -29,15 +29,12 @@ import java.util.Collections; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hive.common.util.HashCodeUtil; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; @@ -58,6 +55,9 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hive.common.util.BloomFilter; +import org.apache.hive.common.util.HashCodeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; @@ -159,8 +159,13 @@ public BytesBytesMultiHashMap getHashMapFromDisk(int rowCount) } else { InputStream inputStream = Files.newInputStream(hashMapLocalPath); com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream); - Kryo kryo = Utilities.runtimeSerializationKryo.get(); - BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class); + Kryo kryo = SerializationUtilities.borrowKryo(); + BytesBytesMultiHashMap restoredHashMap = null; + try { + restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class); + } finally { + SerializationUtilities.releaseKryo(kryo); + } if (rowCount > 0) { restoredHashMap.expandAndRehashToTarget(rowCount); @@ -551,10 +556,14 @@ public long spillPartition(int partitionId) throws IOException { com.esotericsoftware.kryo.io.Output output = new com.esotericsoftware.kryo.io.Output(outputStream); - Kryo kryo = Utilities.runtimeSerializationKryo.get(); - kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap - output.close(); - outputStream.close(); + Kryo kryo = SerializationUtilities.borrowKryo(); + try { + kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap + output.close(); + outputStream.close(); + } finally { + SerializationUtilities.releaseKryo(kryo); + } partition.hashMapLocalPath = path; partition.hashMapOnDisk = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java index 6d391a3..a976de0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java @@ -17,21 +17,22 @@ */ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.metadata.HiveException; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; /** * An eager object container that puts every row directly to output stream. @@ -58,14 +59,11 @@ private Input input; private Output output; - private Kryo kryo; - public ObjectContainer() { readBuffer = (ROW[]) new Object[IN_MEMORY_NUM_ROWS]; for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { readBuffer[i] = (ROW) new Object(); } - kryo = Utilities.runtimeSerializationKryo.get(); try { setupOutput(); } catch (IOException | HiveException e) { @@ -101,7 +99,12 @@ private void setupOutput() throws IOException, HiveException { } public void add(ROW row) { - kryo.writeClassAndObject(output, row); + Kryo kryo = SerializationUtilities.borrowKryo(); + try { + kryo.writeClassAndObject(output, row); + } finally { + SerializationUtilities.releaseKryo(kryo); + } rowsOnDisk++; } @@ -164,8 +167,13 @@ public ROW next() { rowsInReadBuffer = rowsOnDisk; } - for (int i = 0; i < rowsInReadBuffer; i++) { - readBuffer[i] = (ROW) kryo.readClassAndObject(input); + Kryo kryo = SerializationUtilities.borrowKryo(); + try { + for (int i = 0; i < rowsInReadBuffer; i++) { + readBuffer[i] = (ROW) kryo.readClassAndObject(input); + } + } finally { + SerializationUtilities.releaseKryo(kryo); } if (input.eof()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java index fd7109a..d7c278a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java @@ -24,11 +24,12 @@ import java.io.IOException; import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.mapred.JobConf; +import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; @@ -39,15 +40,28 @@ ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); - Utilities.sparkSerializationKryo.get().writeObject(output, object); + Kryo kryo = SerializationUtilities.borrowKryo(); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + try { + kryo.writeObject(output, object); + } finally { + SerializationUtilities.releaseKryo(kryo); + } output.close(); // close() also calls flush() return stream.toByteArray(); } public static T deserialize(byte[] buffer, Class clazz) { - return Utilities.sparkSerializationKryo.get().readObject( - new Input(new ByteArrayInputStream(buffer)), clazz); + Kryo kryo = SerializationUtilities.borrowKryo(); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + T result = null; + try { + result = kryo.readObject(new Input(new ByteArrayInputStream(buffer)), clazz); + } finally { + SerializationUtilities.releaseKryo(kryo); + } + return result; } public static byte[] serializeJobConf(JobConf jobConf) { @@ -80,8 +94,4 @@ public static JobConf deserializeJobConf(byte[] buffer) { return conf; } - public static void setClassLoader(ClassLoader classLoader) { - Utilities.sparkSerializationKryo.get().setClassLoader(classLoader); - } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index c4cb2ba..6380774 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -298,7 +298,6 @@ public Serializable call(JobContext jc) throws Exception { Map addedJars = jc.getAddedJars(); if (addedJars != null && !addedJars.isEmpty()) { SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir()); - KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader()); localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";")); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 3feab1a..b19c70a 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.Map.Entry; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; @@ -532,14 +533,14 @@ public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) { if (!hasObj) { Serializable filterObject = scanDesc.getFilterObject(); if (filterObject != null) { - serializedFilterObj = Utilities.serializeObject(filterObject); + serializedFilterObj = SerializationUtilities.serializeObject(filterObject); } } if (serializedFilterObj != null) { jobConf.set(TableScanDesc.FILTER_OBJECT_CONF_STR, serializedFilterObj); } if (!hasExpr) { - serializedFilterExpr = Utilities.serializeExpression(filterExpr); + serializedFilterExpr = SerializationUtilities.serializeExpression(filterExpr); } String filterText = filterExpr.getExprString(); if (LOG.isDebugEnabled()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index 13390de..017676b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; @@ -128,7 +129,7 @@ private void pushFilters(final JobConf jobConf, final TableScanOperator tableSca } final String filterText = filterExpr.getExprString(); - final String filterExprSerialized = Utilities.serializeExpression(filterExpr); + final String filterExprSerialized = SerializationUtilities.serializeExpression(filterExpr); jobConf.set( TableScanDesc.FILTER_TEXT_CONF_STR, filterText); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java index 7e888bc..6d3a134 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ConvertAstToSearchArg.java @@ -23,11 +23,9 @@ import java.util.List; import org.apache.commons.codec.binary.Base64; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -51,6 +49,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; @@ -433,7 +433,7 @@ public static SearchArgument create(byte[] kryoBytes) { public static SearchArgument createFromConf(Configuration conf) { String sargString; if ((sargString = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR)) != null) { - return create(Utilities.deserializeExpression(sargString)); + return create(SerializationUtilities.deserializeExpression(sargString)); } else if ((sargString = conf.get(SARG_PUSHDOWN)) != null) { return create(sargString); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 44189ef..c682df2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -18,10 +18,32 @@ package org.apache.hadoop.hive.ql.metadata; -import com.google.common.collect.Sets; +import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; +import static org.apache.hadoop.hive.serde.serdeConstants.COLLECTION_DELIM; +import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR; +import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; +import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM; +import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM; +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; +import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -81,6 +103,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.FunctionUtils; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.InPlaceUpdates; import org.apache.hadoop.hive.ql.index.HiveIndexHandler; @@ -100,32 +123,10 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.io.PrintStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; -import static org.apache.hadoop.hive.serde.serdeConstants.COLLECTION_DELIM; -import static org.apache.hadoop.hive.serde.serdeConstants.ESCAPE_CHAR; -import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; -import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM; -import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM; -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; -import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; +import com.google.common.collect.Sets; /** * This class has functions that implement meta data/DDL operations using calls @@ -2087,7 +2088,7 @@ public boolean dropPartition(String dbName, String tableName, List partV new ArrayList>(partSpecs.size()); for (DropTableDesc.PartSpec partSpec : partSpecs) { partExprs.add(new ObjectPair(partSpec.getPrefixLength(), - Utilities.serializeExpressionToKryo(partSpec.getPartSpec()))); + SerializationUtilities.serializeExpressionToKryo(partSpec.getPartSpec()))); } List tParts = getMSC().dropPartitions( dbName, tblName, partExprs, dropOptions); @@ -2362,7 +2363,7 @@ public boolean dropPartition(String dbName, String tableName, List partV public boolean getPartitionsByExpr(Table tbl, ExprNodeGenericFuncDesc expr, HiveConf conf, List result) throws HiveException, TException { assert result != null; - byte[] exprBytes = Utilities.serializeExpressionToKryo(expr); + byte[] exprBytes = SerializationUtilities.serializeExpressionToKryo(expr); String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULTPARTITIONNAME); List msParts = new ArrayList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 1f6b5d7..e9ca5fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -485,7 +486,7 @@ public static boolean cannotConvert(long aliasKnownSize, } // deep copy a new mapred work from xml // Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan - MapredWork newWork = Utilities.clonePlan(currTask.getWork()); + MapredWork newWork = SerializationUtilities.clonePlan(currTask.getWork()); // create map join task and set big table as i MapRedTask newTask = convertTaskToMapJoinTask(newWork, pos); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 15f0d70..a71c474 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -18,6 +18,13 @@ package org.apache.hadoop.hive.ql.optimizer.physical; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -27,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -53,13 +61,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - /** * GenMRSkewJoinProcessor. * @@ -253,7 +254,7 @@ public static void processSkewJoin(JoinOperator joinOp, HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS); newPlan.setMapperCannotSpanPartns(mapperCannotSpanPartns); - MapredWork clonePlan = Utilities.clonePlan(currPlan); + MapredWork clonePlan = SerializationUtilities.clonePlan(currPlan); Operator[] parentOps = new TableScanOperator[tags.length]; for (int k = 0; k < tags.length; k++) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 895e64e..41d3522 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.ql.optimizer.physical; -import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -32,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; @@ -62,12 +65,10 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import com.google.common.base.Preconditions; /** * Copied from GenMRSkewJoinProcessor. It's used for spark task @@ -254,7 +255,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task> reducerList = new ArrayList>(); reducerList.add(reduceWork.getReducer()); - Operator reducer = Utilities.cloneOperatorTree( + Operator reducer = SerializationUtilities.cloneOperatorTree( parseCtx.getConf(), reducerList).get(0); Preconditions.checkArgument(reducer instanceof JoinOperator, "Reducer should be join operator, but actually is " + reducer.getName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java index e94f6e7..dc433fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java @@ -19,29 +19,18 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; import java.util.LinkedHashMap; import java.util.LinkedHashSet; -import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.SortedSet; import java.util.Stack; -import java.util.TreeSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.StatsTask; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.tez.TezTask; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -52,13 +41,14 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SerializeFilter is a simple physical optimizer that serializes all filter expressions in @@ -151,7 +141,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LOG.debug("Serializing: " + ts.getConf().getFilterExpr().getExprString()); } ts.getConf().setSerializedFilterExpr( - Utilities.serializeExpression(ts.getConf().getFilterExpr())); + SerializationUtilities.serializeExpression(ts.getConf().getFilterExpr())); } if (ts.getConf() != null && ts.getConf().getFilterObject() != null) { @@ -160,7 +150,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } ts.getConf().setSerializedFilterObject( - Utilities.serializeObject(ts.getConf().getFilterObject())); + SerializationUtilities.serializeObject(ts.getConf().getFilterObject())); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index 3b09c2f..658717c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -147,7 +148,7 @@ private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOpera throws SemanticException { try { // deep copy a new mapred work - MapredWork currJoinWork = Utilities.clonePlan(currWork); + MapredWork currJoinWork = SerializationUtilities.clonePlan(currWork); SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork); // change the newly created map-red plan as if it was a join operator @@ -165,7 +166,7 @@ private MapRedTask convertSMBTaskToMapJoinTask(MapredWork origWork, SMBMapJoinOperator smbJoinOp) throws UnsupportedEncodingException, SemanticException { // deep copy a new mapred work - MapredWork newWork = Utilities.clonePlan(origWork); + MapredWork newWork = SerializationUtilities.clonePlan(origWork); // create a mapred task for this work MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext .getParseContext().getConf()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java index f9978b4..42ad04b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java @@ -22,13 +22,11 @@ import java.nio.ByteBuffer; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.metastore.Metastore.SplitInfo; import org.apache.hadoop.hive.metastore.Metastore.SplitInfos; import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.ReaderImpl; @@ -38,6 +36,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The basic implementation of PartitionExpressionProxy that uses ql package classes. @@ -71,7 +71,7 @@ public boolean filterPartitionsByExpr(List partColumnNames, private ExprNodeGenericFuncDesc deserializeExpr(byte[] exprBytes) throws MetaException { ExprNodeGenericFuncDesc expr = null; try { - expr = Utilities.deserializeExpressionFromKryo(exprBytes); + expr = SerializationUtilities.deserializeExpressionFromKryo(exprBytes); } catch (Exception ex) { LOG.error("Failed to deserialize the expression", ex); throw new MetaException(ex.getMessage()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java index fb20080..6931ad9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java @@ -30,8 +30,8 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver; @@ -95,7 +95,7 @@ private void splitBaseWork(SparkWork sparkWork, BaseWork parentWork, List> newRoots = Utilities.cloneOperatorTree(conf, roots); + List> newRoots = SerializationUtilities.cloneOperatorTree(conf, roots); // we're cloning the operator plan but we're retaining the original work. That means // that root operators have to be replaced with the cloned ops. The replacement map diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 40c23a5..8dc48cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -28,8 +28,6 @@ import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -43,9 +41,9 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; @@ -60,10 +58,12 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import org.apache.hadoop.hive.ql.plan.TableDesc; /** * GenSparkUtils is a collection of shared helper methods to produce SparkWork @@ -207,7 +207,7 @@ public void removeUnionOperators(Configuration conf, GenSparkProcContext context } // need to clone the plan. - List> newRoots = Utilities.cloneOperatorTree(conf, roots); + List> newRoots = SerializationUtilities.cloneOperatorTree(conf, roots); // Build a map to map the original FileSinkOperator and the cloned FileSinkOperators // This map is used for set the stats flag for the cloned FileSinkOperators in later process diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java index c140f67..4bb661a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SplitOpTreeForDPP.java @@ -24,10 +24,10 @@ import java.util.Set; import java.util.Stack; -import com.google.common.base.Preconditions; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.SemanticException; +import com.google.common.base.Preconditions; + /** * This processor triggers on SparkPartitionPruningSinkOperator. For a operator tree like * this: @@ -105,7 +107,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, filterOp.setChildOperators(Utilities.makeList(selOp)); // Now clone the tree above selOp - List> newRoots = Utilities.cloneOperatorTree(context.parseContext.getConf(), roots); + List> newRoots = SerializationUtilities.cloneOperatorTree( + context.parseContext.getConf(), roots); for (int i = 0; i < roots.size(); i++) { TableScanOperator newTs = (TableScanOperator) newRoots.get(i); TableScanOperator oldTs = (TableScanOperator) roots.get(i); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java index 5c5fafa..e2aaa70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java @@ -24,17 +24,18 @@ import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.stats.StatsAggregator; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; public class FSStatsAggregator implements StatsAggregator { @@ -62,7 +63,12 @@ public boolean accept(Path file) { }); for (FileStatus file : status) { Input in = new Input(fs.open(file.getPath())); - statsMap = Utilities.runtimeSerializationKryo.get().readObject(in, statsMap.getClass()); + Kryo kryo = SerializationUtilities.borrowKryo(); + try { + statsMap = kryo.readObject(in, statsMap.getClass()); + } finally { + SerializationUtilities.releaseKryo(kryo); + } LOG.info("Read stats : " +statsMap); statsList.add(statsMap); in.close(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java index 80f954b..e5d89e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java @@ -24,15 +24,16 @@ import java.util.Map; import java.util.Map.Entry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Output; public class FSStatsPublisher implements StatsPublisher { @@ -100,7 +101,12 @@ public boolean closeConnection(StatsCollectionContext context) { Output output = new Output(statsFile.getFileSystem(conf).create(statsFile,true)); LOG.debug("Created file : " + statsFile); LOG.debug("Writing stats in it : " + statsMap); - Utilities.runtimeSerializationKryo.get().writeObject(output, statsMap); + Kryo kryo = SerializationUtilities.borrowKryo(); + try { + kryo.writeObject(output, statsMap); + } finally { + SerializationUtilities.releaseKryo(kryo); + } output.close(); return true; } catch (IOException e) { diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java b/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java index d6d513d..5e53604 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/TestMetastoreExpr.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.Stack; -import junit.framework.TestCase; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -36,7 +34,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -52,6 +50,8 @@ import com.google.common.collect.Lists; +import junit.framework.TestCase; + /** * Tests hive metastore expression support. This should be moved in metastore module * as soon as we are able to use ql from metastore server (requires splitting metastore @@ -166,8 +166,8 @@ public void testPartitionExpr() throws Exception { public void checkExpr(int numParts, String dbName, String tblName, ExprNodeGenericFuncDesc expr) throws Exception { List parts = new ArrayList(); - client.listPartitionsByExpr( - dbName, tblName, Utilities.serializeExpressionToKryo(expr), null, (short)-1, parts); + client.listPartitionsByExpr(dbName, tblName, + SerializationUtilities.serializeExpressionToKryo(expr), null, (short)-1, parts); assertEquals("Partition check failed: " + expr.getExprString(), numParts, parts.size()); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java index 1364888..c1667c2 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestPlan.java @@ -23,8 +23,6 @@ import java.util.ArrayList; import java.util.LinkedHashMap; -import junit.framework.TestCase; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -37,6 +35,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.JobConf; +import junit.framework.TestCase; + /** * TestPlan. * @@ -83,7 +83,7 @@ public void testPlan() throws Exception { JobConf job = new JobConf(TestPlan.class); // serialize the configuration once .. ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Utilities.serializePlan(mrwork, baos, job); + SerializationUtilities.serializePlan(mrwork, baos, job); baos.close(); String v1 = baos.toString(); @@ -101,7 +101,7 @@ public void testPlan() throws Exception { // serialize again baos.reset(); - Utilities.serializePlan(mrwork2, baos, job); + SerializationUtilities.serializePlan(mrwork2, baos, job); baos.close(); // verify that the two are equal diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 028cdd1..bb6a4e1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -26,16 +26,8 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Set; - -import com.google.common.collect.Sets; -import com.google.common.io.Files; -import junit.framework.Assert; -import junit.framework.TestCase; import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -46,6 +38,14 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +import junit.framework.Assert; +import junit.framework.TestCase; public class TestUtilities extends TestCase { public static final Logger LOG = LoggerFactory.getLogger(TestUtilities.class); @@ -85,8 +85,8 @@ public void testSerializeTimestamp() { children.add(constant); ExprNodeGenericFuncDesc desc = new ExprNodeGenericFuncDesc(TypeInfoFactory.timestampTypeInfo, new GenericUDFFromUtcTimestamp(), children); - assertEquals(desc.getExprString(), Utilities.deserializeExpression( - Utilities.serializeExpression(desc)).getExprString()); + assertEquals(desc.getExprString(), SerializationUtilities.deserializeExpression( + SerializationUtilities.serializeExpression(desc)).getExprString()); } public void testgetDbTableName() throws HiveException{ diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 9f616ab..1ff7eb5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -17,7 +17,13 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import java.io.DataInput; import java.io.DataOutput; @@ -51,6 +57,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -67,7 +74,6 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy; -import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger.MyRow; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -1609,7 +1615,7 @@ JobConf createMockExecutionEnvironment(Path workDir, Path mapXml = new Path(workDir, "map.xml"); localFs.delete(mapXml, true); FSDataOutputStream planStream = localFs.create(mapXml); - Utilities.serializePlan(mapWork, planStream, conf); + SerializationUtilities.serializePlan(mapWork, planStream, conf); planStream.close(); return conf; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java index 3560c43..7a93b54 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -121,7 +121,7 @@ public void testSplitEliminationSmallMaxSplit() throws Exception { childExpr.add(col); childExpr.add(con); ExprNodeGenericFuncDesc en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - String sargStr = Utilities.serializeExpression(en); + String sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); InputSplit[] splits = in.getSplits(conf, 1); assertEquals(5, splits.length); @@ -129,7 +129,7 @@ public void testSplitEliminationSmallMaxSplit() throws Exception { con = new ExprNodeConstantDesc(1); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); assertEquals(0, splits.length); @@ -137,7 +137,7 @@ public void testSplitEliminationSmallMaxSplit() throws Exception { con = new ExprNodeConstantDesc(2); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); assertEquals(1, splits.length); @@ -145,7 +145,7 @@ public void testSplitEliminationSmallMaxSplit() throws Exception { con = new ExprNodeConstantDesc(5); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); assertEquals(2, splits.length); @@ -153,7 +153,7 @@ public void testSplitEliminationSmallMaxSplit() throws Exception { con = new ExprNodeConstantDesc(13); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); assertEquals(3, splits.length); @@ -161,7 +161,7 @@ public void testSplitEliminationSmallMaxSplit() throws Exception { con = new ExprNodeConstantDesc(29); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); assertEquals(4, splits.length); @@ -169,7 +169,7 @@ public void testSplitEliminationSmallMaxSplit() throws Exception { con = new ExprNodeConstantDesc(70); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); assertEquals(5, splits.length); @@ -199,7 +199,7 @@ public void testSplitEliminationLargeMaxSplit() throws Exception { childExpr.add(col); childExpr.add(con); ExprNodeGenericFuncDesc en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - String sargStr = Utilities.serializeExpression(en); + String sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); InputSplit[] splits = in.getSplits(conf, 1); assertEquals(2, splits.length); @@ -207,7 +207,7 @@ public void testSplitEliminationLargeMaxSplit() throws Exception { con = new ExprNodeConstantDesc(0); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // no stripes satisfies the condition @@ -216,7 +216,7 @@ public void testSplitEliminationLargeMaxSplit() throws Exception { con = new ExprNodeConstantDesc(2); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // only first stripe will satisfy condition and hence single split @@ -225,7 +225,7 @@ public void testSplitEliminationLargeMaxSplit() throws Exception { con = new ExprNodeConstantDesc(5); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // first stripe will satisfy the predicate and will be a single split, last stripe will be a @@ -235,7 +235,7 @@ public void testSplitEliminationLargeMaxSplit() throws Exception { con = new ExprNodeConstantDesc(13); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // first 2 stripes will satisfy the predicate and merged to single split, last stripe will be a @@ -245,7 +245,7 @@ public void testSplitEliminationLargeMaxSplit() throws Exception { con = new ExprNodeConstantDesc(29); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // first 3 stripes will satisfy the predicate and merged to single split, last stripe will be a @@ -255,7 +255,7 @@ public void testSplitEliminationLargeMaxSplit() throws Exception { con = new ExprNodeConstantDesc(70); childExpr.set(1, con); en = new ExprNodeGenericFuncDesc(inspector, udf, childExpr); - sargStr = Utilities.serializeExpression(en); + sargStr = SerializationUtilities.serializeExpression(en); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // first 2 stripes will satisfy the predicate and merged to single split, last two stripe will @@ -304,7 +304,7 @@ public void testSplitEliminationComplexExpr() throws Exception { childExpr2.add(en1); ExprNodeGenericFuncDesc en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2); - String sargStr = Utilities.serializeExpression(en2); + String sargStr = SerializationUtilities.serializeExpression(en2); conf.set("hive.io.filter.expr.serialized", sargStr); InputSplit[] splits = in.getSplits(conf, 1); assertEquals(2, splits.length); @@ -321,7 +321,7 @@ public void testSplitEliminationComplexExpr() throws Exception { childExpr2.set(1, en1); en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2); - sargStr = Utilities.serializeExpression(en2); + sargStr = SerializationUtilities.serializeExpression(en2); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // no stripe will satisfy the predicate @@ -339,7 +339,7 @@ public void testSplitEliminationComplexExpr() throws Exception { childExpr2.set(1, en1); en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2); - sargStr = Utilities.serializeExpression(en2); + sargStr = SerializationUtilities.serializeExpression(en2); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // only first stripe will satisfy condition and hence single split @@ -358,7 +358,7 @@ public void testSplitEliminationComplexExpr() throws Exception { childExpr2.set(1, en1); en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2); - sargStr = Utilities.serializeExpression(en2); + sargStr = SerializationUtilities.serializeExpression(en2); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // first two stripes will satisfy condition and hence single split @@ -378,7 +378,7 @@ public void testSplitEliminationComplexExpr() throws Exception { childExpr2.set(1, en1); en2 = new ExprNodeGenericFuncDesc(inspector, udf2, childExpr2); - sargStr = Utilities.serializeExpression(en2); + sargStr = SerializationUtilities.serializeExpression(en2); conf.set("hive.io.filter.expr.serialized", sargStr); splits = in.getSplits(conf, 1); // only second stripes will satisfy condition and hence single split diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java index 7204521..bf363f3 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java @@ -18,12 +18,19 @@ package org.apache.hadoop.hive.ql.io.parquet; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; -import org.apache.hadoop.hive.ql.plan.*; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -34,16 +41,14 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import com.google.common.collect.Lists; public class TestParquetRowGroupFilter extends AbstractTestParquetDirect { @@ -96,7 +101,7 @@ public void write(RecordConsumer consumer) { children.add(columnDesc); children.add(constantDesc); ExprNodeGenericFuncDesc genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); - String searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + String searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); ParquetRecordReaderWrapper recordReader = (ParquetRecordReaderWrapper) @@ -109,7 +114,7 @@ public void write(RecordConsumer consumer) { constantDesc = new ExprNodeConstantDesc(100); children.set(1, constantDesc); genericFuncDesc = new ExprNodeGenericFuncDesc(inspector, udf, children); - searchArgumentStr = Utilities.serializeExpression(genericFuncDesc); + searchArgumentStr = SerializationUtilities.serializeExpression(genericFuncDesc); conf.set(TableScanDesc.FILTER_EXPR_CONF_STR, searchArgumentStr); recordReader = (ParquetRecordReaderWrapper) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java index e72789d..a0fa700 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java @@ -22,23 +22,22 @@ import static junit.framework.Assert.assertNull; import static junit.framework.Assert.assertTrue; -import com.google.common.collect.Sets; +import java.beans.XMLDecoder; +import java.io.ByteArrayInputStream; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Set; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import org.junit.Test; -import java.beans.XMLDecoder; -import java.io.ByteArrayInputStream; -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.Set; - -import org.apache.parquet.filter2.predicate.FilterPredicate; +import com.google.common.collect.Sets; /** * These tests cover the conversion from Hive's AST to SearchArguments. @@ -2713,7 +2712,7 @@ public void TestTimestampSarg() throws Exception { "AAABgj0BRVFVQcwBBW9yZy5hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5Q" + "EAAAECAQFib29sZWHu"; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("leaf-0", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size()); @@ -2732,7 +2731,7 @@ public void TestDateSarg() throws Exception { "Y2hlLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUH" + "MAQVvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g=="; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("leaf-0", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size()); @@ -2752,7 +2751,7 @@ public void TestDecimalSarg() throws Exception { "oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQZvcmcuYXBhY2" + "hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g=="; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("leaf-0", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size()); @@ -2772,7 +2771,7 @@ public void TestCharSarg() throws Exception { "vb3AuaGl2ZS5xbC51ZGYuZ2VuZXJpYy5HZW5lcmljVURGT1BFcXVh7AEAAAGCPQFFUVVBzAEGb3JnLm" + "FwYWNoZS5oYWRvb3AuaW8uQm9vbGVhbldyaXRhYmzlAQAAAQQBAWJvb2xlYe4="; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("leaf-0", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size()); @@ -2792,7 +2791,7 @@ public void TestVarcharSarg() throws Exception { "lLmhhZG9vcC5oaXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQ" + "ZvcmcuYXBhY2hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABBAEBYm9vbGVh7g=="; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("leaf-0", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size()); @@ -2811,7 +2810,7 @@ public void TestBigintSarg() throws Exception { "dmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5hcGFjaGU" + "uaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu"; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("leaf-0", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size()); @@ -2832,7 +2831,7 @@ public void TestBooleanSarg() throws Exception { "hlLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAwkBAgEBYrIAAAgBAwkBB29yZy5hcGFjaGUua" + "GFkb29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QQW7kAQEGAQAAAQMJ"; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("(and leaf-0 leaf-1)", sarg.getExpression().toString()); assertEquals(2, sarg.getLeaves().size()); @@ -2854,7 +2853,7 @@ public void TestFloatSarg() throws Exception { "aXZlLnFsLnVkZi5nZW5lcmljLkdlbmVyaWNVREZPUEVxdWHsAQAAAYI9AUVRVUHMAQVvcmcuYXBhY2h" + "lLmhhZG9vcC5pby5Cb29sZWFuV3JpdGFibOUBAAABAgEBYm9vbGVh7g=="; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("leaf-0", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size()); @@ -2873,7 +2872,7 @@ public void TestDoubleSarg() throws Exception { "b29wLmhpdmUucWwudWRmLmdlbmVyaWMuR2VuZXJpY1VERk9QRXF1YewBAAABgj0BRVFVQcwBBW9yZy5" + "hcGFjaGUuaGFkb29wLmlvLkJvb2xlYW5Xcml0YWJs5QEAAAECAQFib29sZWHu"; SearchArgument sarg = - new ConvertAstToSearchArg(Utilities.deserializeExpression(serialAst)) + new ConvertAstToSearchArg(SerializationUtilities.deserializeExpression(serialAst)) .buildSearchArgument(); assertEquals("leaf-0", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size());