diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java b/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java index 0bd788e..032322a 100644 --- a/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java +++ b/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java @@ -36,8 +36,8 @@ */ public class GenericUDTFCount2 extends GenericUDTF { - Integer count = Integer.valueOf(0); - Object forwardObj[] = new Object[1]; + private transient Integer count = Integer.valueOf(0); + private transient Object forwardObj[] = new Object[1]; @Override public void close() throws HiveException { diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java b/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java index da5ae9c..a4dd152 100644 --- a/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java +++ b/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java @@ -67,7 +67,7 @@ public StructObjectInspector initialize(ObjectInspector[] args) fieldOIs); } - Object forwardObj[] = new Object[2]; + private transient Object forwardObj[] = new Object[2]; @Override public void process(Object[] o) throws HiveException { diff --git a/ivy/libraries.properties b/ivy/libraries.properties index c306440..3b8260e 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -51,6 +51,7 @@ jetty.version=6.1.26 jline.version=0.9.94 json.version=20090211 junit.version=4.10 +kryo.version=2.21 libfb303.version=0.9.0 libthrift.version=0.9.0 log4j.version=1.2.16 diff --git a/ql/build.xml b/ql/build.xml index d185e51..c554149 100644 --- a/ql/build.xml +++ b/ql/build.xml @@ -250,7 +250,15 @@ - + + + + + + @@ -296,6 +304,7 @@ + + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 598bd98..7aa2826 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -465,7 +465,7 @@ public int compile(String command, boolean resetTaskIds) { // deserialize the queryPlan FileInputStream fis = new FileInputStream(queryPlanFileName); - QueryPlan newPlan = Utilities.deserializeObject(fis); + QueryPlan newPlan = Utilities.deserializeObject(fis, QueryPlan.class); fis.close(); // Use the deserialized plan diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java index ee44051..5ee2054 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java @@ -54,7 +54,7 @@ */ private boolean isVirtualCol; - private transient ObjectInspector objectInspector; + private ObjectInspector objectInspector; private boolean isHiddenVirtualCol; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index edb55fa..9792f6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -112,6 +112,9 @@ import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; +import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; +import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; +import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -165,6 +168,10 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Shell; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + /** * Utilities. * @@ -268,8 +275,19 @@ public static BaseWork getBaseWork(Configuration conf, String name) { localPath = new Path(name); } InputStream in = new FileInputStream(localPath.toUri().getPath()); - BaseWork ret = deserializeObject(in); - gWork = ret; + if(MAP_PLAN_NAME.equals(name)){ + if("org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper".equals(conf.get("mapred.mapper.class"))) { + gWork = deserializeObject(in, MergeWork.class); + } else if("org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper".equals(conf.get("mapred.mapper.class"))) { + gWork = deserializeObject(in, ColumnTruncateWork.class); + } else if("org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper".equals(conf.get("mapred.mapper.class"))) { + gWork = deserializeObject(in, PartialScanWork.class); + } else { + gWork = deserializeObject(in, MapWork.class); + } + } else { + gWork = deserializeObject(in, ReduceWork.class); + } gWorkMap.put(path, gWork); } return gWork; @@ -589,43 +607,42 @@ protected void initialize(Class type, Object oldInstance, Object newInstance, En /** * Serialize the object. This helper function mainly makes sure that enums, * counters, etc are handled properly. + * @throws IOException */ - public static void serializeObject(Object plan, OutputStream out) { - XMLEncoder e = new XMLEncoder(out); - e.setExceptionListener(new ExceptionListener() { - public void exceptionThrown(Exception e) { - LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e)); - throw new RuntimeException("Cannot serialize object", e); - } - }); - // workaround for java 1.5 - e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate()); - e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate()); - e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate()); - e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate()); - e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate()); - - e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate()); - e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate()); - - e.writeObject(plan); - e.close(); + public static void serializeObject(Object plan, OutputStream out) throws IOException { + com.esotericsoftware.minlog.Log.set(2); + Kryo kryo = new Kryo(); + Output output = new Output(out); + kryo.writeObject(output, plan); + output.close(); } /** * De-serialize an object. This helper function mainly makes sure that enums, * counters, etc are handled properly. */ - @SuppressWarnings("unchecked") - public static T deserializeObject(InputStream in) { - XMLDecoder d = null; + public static T deserializeObject(InputStream in, Class clazz ) { + com.esotericsoftware.minlog.Log.set(2); + Kryo kryo = new Kryo(); + Input inp = new Input(in); + T t = kryo.readObject(inp,clazz); + inp.close(); + return t; + } + + public static MapredWork copyMRWork(MapredWork work) { try { - d = new XMLDecoder(in, null, null); - return (T) d.readObject(); - } finally { - if (null != d) { - d.close(); - } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializeObject(work, baos); + ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray()); + try { + return deserializeObject(in, MapredWork.class); + } finally { + in.close(); + } + } catch(IOException e) { + // exceedingly unlikely + throw new RuntimeException(e); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index eb9205b..2cc70f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -716,12 +716,12 @@ public static void main(String[] args) throws IOException, HiveException { int ret; if (localtask) { memoryMXBean = ManagementFactory.getMemoryMXBean(); - MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData); + MapredLocalWork plan = Utilities.deserializeObject(pathData, MapredLocalWork.class); MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent); ret = ed.executeFromChildJVM(new DriverContext()); } else { - MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData); + MapredWork plan = Utilities.deserializeObject(pathData, MapredWork.class); ExecDriver ed = new ExecDriver(plan, conf, isSilent); ret = ed.execute(new DriverContext()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index cd872b2..b0e6ef3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -35,10 +35,10 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskHandle; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.session.SessionState; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 49a0ee3..ff5f3d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -39,14 +39,11 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.mapred.JobConf; /** * Extension of ExecDriver: * - can optionally spawn a map-reduce task from a separate jvm @@ -69,7 +66,6 @@ private transient ContentSummary inputSummary = null; private transient boolean runningViaChild = false; - private transient boolean inputSizeEstimated = false; private transient long totalInputFileSize; private transient long totalInputNumFiles; @@ -79,10 +75,6 @@ public MapRedTask() { super(); } - public MapRedTask(MapredWork plan, JobConf job, boolean isSilent) throws HiveException { - throw new RuntimeException("Illegal Constructor call"); - } - @Override public int execute(DriverContext driverContext) { @@ -408,7 +400,7 @@ private void setNumberOfReducers() throws IOException { if (inputSummary == null) { inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null); } - int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), + int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), work.isFinalMapRed()); rWork.setNumReduceTasks(reducers); console diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java index c8354ff..f834538 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java @@ -144,9 +144,9 @@ public void setSortedColsByDirectory(Map> sortedColsByDire public static final class BucketCol implements BucketSortCol, Serializable { private static final long serialVersionUID = 1L; // Equivalent aliases for the column - private final List names = new ArrayList(); + List names = new ArrayList(); // Indexes of those equivalent columns - private final List indexes = new ArrayList(); + List indexes = new ArrayList(); public BucketCol(String name, int index) { addAlias(name, index); @@ -189,13 +189,18 @@ public String toString() { * to be constant for all equivalent columns. */ public static final class SortCol implements BucketSortCol, Serializable { + + public SortCol() { + super(); + } + private static final long serialVersionUID = 1L; // Equivalent aliases for the column - private final List names = new ArrayList(); + List names = new ArrayList(); // Indexes of those equivalent columns - private final List indexes = new ArrayList(); + List indexes = new ArrayList(); // Sort order (+|-) - private final char sortOrder; + char sortOrder; public SortCol(String name, int index, char sortOrder) { this(sortOrder); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index e214807..35e3810 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.optimizer.physical; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.ArrayList; @@ -502,9 +500,10 @@ public static boolean cannotConvert(String bigTableAlias, } // deep copy a new mapred work from xml // Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan - String xml = currTask.getWork().toXML(); - InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8")); - MapredWork newWork = Utilities.deserializeObject(in); + + MapredWork newWork = Utilities.copyMRWork(currTask.getWork()); + //MapredWork newWork = kryo.copy(currTask.getWork()); + // create map join task and set big table as i ObjectPair newTaskAlias = convertTaskToMapJoinTask(newWork, i); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index ee4d4d1..b6bbe5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.physical; -import java.io.ByteArrayInputStream; import java.io.Serializable; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -241,16 +239,7 @@ public static void processSkewJoin(JoinOperator joinOp, HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS); newPlan.setMapperCannotSpanPartns(mapperCannotSpanPartns); - MapredWork clonePlan = null; - try { - String xmlPlan = currPlan.toXML(); - StringBuilder sb = new StringBuilder(xmlPlan); - ByteArrayInputStream bis; - bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8")); - clonePlan = Utilities.deserializeObject(bis); - } catch (UnsupportedEncodingException e) { - throw new SemanticException(e); - } + MapredWork clonePlan = Utilities.copyMRWork(currPlan); Operator[] parentOps = new TableScanOperator[tags.length]; for (int k = 0; k < tags.length; k++) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index da5115b..8d4d6d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -148,11 +148,8 @@ private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) { private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOperator oldSMBJoinOp) throws SemanticException { try { - String xml = currWork.toXML(); - // deep copy a new mapred work - InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8")); - MapredWork currJoinWork = Utilities.deserializeObject(in); + MapredWork currJoinWork = Utilities.copyMRWork(currWork); SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork); // Add the row resolver for the new operator @@ -169,14 +166,11 @@ private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOpera } // create map join task and set big table as bigTablePosition - private ObjectPair convertSMBTaskToMapJoinTask(String xml, + private ObjectPair convertSMBTaskToMapJoinTask(MapredWork newWork, int bigTablePosition, SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree) throws UnsupportedEncodingException, SemanticException { - // deep copy a new mapred work from xml - InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8")); - MapredWork newWork = Utilities.deserializeObject(in); // create a mapred task for this work MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext .getParseContext().getConf()); @@ -295,7 +289,6 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currJoinWork.getMapWork(), pathToAliases, aliasToSize); - String xml = currJoinWork.toXML(); long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVESMALLTABLESFILESIZE); @@ -307,7 +300,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) // create map join task for the given big table position ObjectPair newTaskAlias = - convertSMBTaskToMapJoinTask(xml, bigTablePosition, newSMBJoinOp, joinTree); + convertSMBTaskToMapJoinTask(Utilities.copyMRWork(currJoinWork), bigTablePosition, newSMBJoinOp, joinTree); MapRedTask newTask = newTaskAlias.getFirst(); String bigTableAlias = newTaskAlias.getSecond(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java index 0aaf47e..9834e73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java @@ -51,22 +51,22 @@ // keeps track of the right-hand-side table name of the left-semi-join, and // its list of join keys - private final HashMap> rhsSemijoin; + private transient final HashMap> rhsSemijoin; // join conditions - private ArrayList> expressions; + private transient ArrayList> expressions; // key index to nullsafe join flag private ArrayList nullsafes; // filters - private ArrayList> filters; + private transient ArrayList> filters; // outerjoin-pos = other-pos:filter-len, other-pos:filter-len, ... private int[][] filterMap; // filters for pushing - private ArrayList> filtersForPushing; + private transient ArrayList> filtersForPushing; // user asked for map-side join private boolean mapSideJoin; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java index 7298cff..f3203bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java @@ -18,12 +18,10 @@ package org.apache.hadoop.hive.ql.plan; -import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.mapred.JobConf; @@ -82,10 +80,4 @@ public void configureJobConf(JobConf job) { return ops; } - - public String toXML() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Utilities.serializeObject(this, baos); - return (baos.toString()); - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java index 4f28628..4a7caab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java @@ -160,15 +160,15 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticE */ public static class GenericUDAFnGramEvaluator extends GenericUDAFEvaluator { // For PARTIAL1 and COMPLETE: ObjectInspectors for original data - private StandardListObjectInspector outerInputOI; - private StandardListObjectInspector innerInputOI; - private PrimitiveObjectInspector inputOI; - private PrimitiveObjectInspector nOI; - private PrimitiveObjectInspector kOI; - private PrimitiveObjectInspector pOI; + private transient StandardListObjectInspector outerInputOI; + private transient StandardListObjectInspector innerInputOI; + private transient PrimitiveObjectInspector inputOI; + private transient PrimitiveObjectInspector nOI; + private transient PrimitiveObjectInspector kOI; + private transient PrimitiveObjectInspector pOI; // For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations - private StandardListObjectInspector loi; + private transient StandardListObjectInspector loi; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java index 4176d3f..c429367 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java @@ -38,8 +38,8 @@ @Description(name = "array", value = "_FUNC_(n0, n1...) - Creates an array with the given elements ") public class GenericUDFArray extends GenericUDF { - private Converter[] converters; - private ArrayList ret = new ArrayList(); + private transient Converter[] converters; + private transient ArrayList ret = new ArrayList(); @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBridge.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBridge.java index a9feba4..3e01364 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBridge.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFBridge.java @@ -57,7 +57,7 @@ /** * The underlying UDF class. */ - Class udfClass; + private Class udfClass; /** * The underlying UDF class Name. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFormatNumber.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFormatNumber.java index f23193e..fb47be0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFormatNumber.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFFormatNumber.java @@ -56,7 +56,7 @@ private transient ObjectInspector[] argumentOIs; private final Text resultText = new Text(); private final StringBuilder pattern = new StringBuilder(""); - private final DecimalFormat numberFormat = new DecimalFormat(""); + private transient final DecimalFormat numberFormat = new DecimalFormat(""); private int lastDValue = -1; @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java index 0659dfa..5911f2c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFIndex.java @@ -36,11 +36,11 @@ */ @Description(name = "index", value = "_FUNC_(a, n) - Returns the n-th element of a ") public class GenericUDFIndex extends GenericUDF { - private MapObjectInspector mapOI; + private transient MapObjectInspector mapOI; private boolean mapKeyPreferWritable; - private ListObjectInspector listOI; - private PrimitiveObjectInspector indexOI; - private ObjectInspector returnOI; + private transient ListObjectInspector listOI; + private transient PrimitiveObjectInspector indexOI; + private transient ObjectInspector returnOI; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFNamedStruct.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFNamedStruct.java index 2c18d93..7b18d82 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFNamedStruct.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFNamedStruct.java @@ -35,7 +35,7 @@ value = "_FUNC_(name1, val1, name2, val2, ...) - Creates a struct with the given " + "field names and values") public class GenericUDFNamedStruct extends GenericUDF { - Object[] ret; + private transient Object[] ret; @Override public ObjectInspector initialize(ObjectInspector[] arguments) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFStruct.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFStruct.java index 258c0bf..7c8dfcf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFStruct.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFStruct.java @@ -31,7 +31,7 @@ @Description(name = "struct", value = "_FUNC_(col1, col2, col3, ...) - Creates a struct with the given field values") public class GenericUDFStruct extends GenericUDF { - Object[] ret; + private transient Object[] ret; @Override public ObjectInspector initialize(ObjectInspector[] arguments) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java index de2dfb1..9aec825 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java @@ -77,8 +77,8 @@ public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgume fieldOIs); } - private final Object[] forwardListObj = new Object[1]; - private final Object[] forwardMapObj = new Object[2]; + private transient final Object[] forwardListObj = new Object[1]; + private transient final Object[] forwardMapObj = new Object[2]; @Override public void process(Object[] o) throws HiveException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java index 490198a..521f4e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java @@ -63,9 +63,9 @@ int numCols; // number of output columns String[] paths; // array of path expressions, each of which corresponds to a column - Text[] retCols; // array of returned column values - Text[] cols; // object pool of non-null Text, avoid creating objects all the time - Object[] nullCols; // array of null column values + private transient Text[] retCols; // array of returned column values + private transient Text[] cols; // object pool of non-null Text, avoid creating objects all the time + private transient Object[] nullCols; // array of null column values private transient ObjectInspector[] inputOIs; // input ObjectInspectors boolean pathParsed = false; boolean seenErrors = false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFParseUrlTuple.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFParseUrlTuple.java index 96ec55f..dff9ba6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFParseUrlTuple.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFParseUrlTuple.java @@ -18,11 +18,11 @@ package org.apache.hadoop.hive.ql.udf.generic; -import java.net.URL; import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; -import java.util.regex.Pattern; import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,13 +65,13 @@ PARTNAME[] partnames; // mapping from pathnames to enum PARTNAME Text[] retCols; // array of returned column values Text[] cols; // object pool of non-null Text, avoid creating objects all the time - Object[] nullCols; // array of null column values - ObjectInspector[] inputOIs; // input ObjectInspectors + private transient Object[] nullCols; // array of null column values + private transient ObjectInspector[] inputOIs; // input ObjectInspectors boolean pathParsed = false; boolean seenErrors = false; - URL url = null; - Pattern p = null; - String lastKey = null; + private transient URL url = null; + private transient Pattern p = null; + private transient String lastKey = null; @Override public void close() throws HiveException { @@ -122,7 +122,7 @@ public StructObjectInspector initialize(ObjectInspector[] args) // all returned type will be Text fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); } - + return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @@ -137,7 +137,7 @@ public void process(Object[] o) throws HiveException { if (!pathParsed) { for (int i = 0;i < numCols; ++i) { paths[i] = ((StringObjectInspector) inputOIs[i+1]).getPrimitiveJavaObject(o[i+1]); - + if (paths[i] == null) { partnames[i] = PARTNAME.NULLNAME; } else if (paths[i].equals("HOST")) { @@ -158,11 +158,11 @@ public void process(Object[] o) throws HiveException { partnames[i] = PARTNAME.USERINFO; } else if (paths[i].startsWith("QUERY:")) { partnames[i] = PARTNAME.QUERY_WITH_KEY; - paths[i] = paths[i].substring(6); // update paths[i], e.g., from "QUERY:id" to "id" + paths[i] = paths[i].substring(6); // update paths[i], e.g., from "QUERY:id" to "id" } else { partnames[i] = PARTNAME.NULLNAME; - } - } + } + } pathParsed = true; } @@ -171,9 +171,9 @@ public void process(Object[] o) throws HiveException { forward(nullCols); return; } - + try { - String ret = null; + String ret = null; url = new URL(urlStr); for (int i = 0; i < numCols; ++i) { ret = evaluate(url, i); @@ -188,7 +188,7 @@ public void process(Object[] o) throws HiveException { } forward(retCols); - return; + return; } catch (MalformedURLException e) { // parsing error, invalid url string if (!seenErrors) { @@ -204,10 +204,11 @@ public void process(Object[] o) throws HiveException { public String toString() { return "parse_url_tuple"; } - + private String evaluate(URL url, int index) { - if (url == null || index < 0 || index >= partnames.length) + if (url == null || index < 0 || index >= partnames.length) { return null; + } switch (partnames[index]) { case HOST : return url.getHost(); @@ -239,5 +240,5 @@ private String evaluateQuery(String query, String key) { return m.group(2); } return null; - } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFStack.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFStack.java index 2c423fa..1a744e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFStack.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFStack.java @@ -48,7 +48,7 @@ public void close() throws HiveException { } private transient List argOIs = new ArrayList(); - Object[] forwardObj = null; + private transient Object[] forwardObj = null; private transient ArrayList returnOIResolvers = new ArrayList(); IntWritable numRows = null;