diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java b/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java
index 0bd788e..032322a 100644
--- a/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java
+++ b/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java
@@ -36,8 +36,8 @@
*/
public class GenericUDTFCount2 extends GenericUDTF {
- Integer count = Integer.valueOf(0);
- Object forwardObj[] = new Object[1];
+ private transient Integer count = Integer.valueOf(0);
+ private transient Object forwardObj[] = new Object[1];
@Override
public void close() throws HiveException {
diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java b/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java
index da5ae9c..a4dd152 100644
--- a/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java
+++ b/contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java
@@ -67,7 +67,7 @@ public StructObjectInspector initialize(ObjectInspector[] args)
fieldOIs);
}
- Object forwardObj[] = new Object[2];
+ private transient Object forwardObj[] = new Object[2];
@Override
public void process(Object[] o) throws HiveException {
diff --git a/hcatalog/pom.xml b/hcatalog/pom.xml
index 81f1e71..3c454f0 100644
--- a/hcatalog/pom.xml
+++ b/hcatalog/pom.xml
@@ -215,7 +215,16 @@
false
-
+
+ sonatype-snapshots
+ https://oss.sonatype.org/content/repositories/snapshots/
+
+ false
+
+
+ true
+
+
diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml
index d230f2c..51ce5ac 100644
--- a/ivy/ivysettings.xml
+++ b/ivy/ivysettings.xml
@@ -47,6 +47,10 @@
checkmodified="${ivy.checkmodified}"
changingPattern="${ivy.changingPattern}"/>
+
+
@@ -68,6 +72,7 @@
+
@@ -77,11 +82,13 @@
+
+
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index c306440..aa8e742 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -51,6 +51,7 @@ jetty.version=6.1.26
jline.version=0.9.94
json.version=20090211
junit.version=4.10
+kryo.version=2.22-SNAPSHOT
libfb303.version=0.9.0
libthrift.version=0.9.0
log4j.version=1.2.16
diff --git a/ql/build.xml b/ql/build.xml
index d185e51..7398408 100644
--- a/ql/build.xml
+++ b/ql/build.xml
@@ -250,7 +250,15 @@
-
+
+
+
+
+
+
@@ -296,6 +304,7 @@
+
+
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index e27222f..710102f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -466,7 +466,7 @@ public int compile(String command, boolean resetTaskIds) {
// deserialize the queryPlan
FileInputStream fis = new FileInputStream(queryPlanFileName);
- QueryPlan newPlan = Utilities.deserializeObject(fis);
+ QueryPlan newPlan = Utilities.deserializeObject(fis, QueryPlan.class);
fis.close();
// Use the deserialized plan
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
index ee44051..5ee2054 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
@@ -54,7 +54,7 @@
*/
private boolean isVirtualCol;
- private transient ObjectInspector objectInspector;
+ private ObjectInspector objectInspector;
private boolean isHiddenVirtualCol;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
index 3f1e001..24c8281 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
@@ -31,12 +31,12 @@
*/
public class ExprNodeColumnEvaluator extends ExprNodeEvaluator {
- transient boolean simpleCase;
- transient StructObjectInspector inspector;
- transient StructField field;
- transient StructObjectInspector[] inspectors;
- transient StructField[] fields;
- transient boolean[] unionField;
+ private transient boolean simpleCase;
+ private transient StructObjectInspector inspector;
+ private transient StructField field;
+ private transient StructObjectInspector[] inspectors;
+ private transient StructField[] fields;
+ private transient boolean[] unionField;
public ExprNodeColumnEvaluator(ExprNodeColumnDesc expr) {
super(expr);
@@ -61,18 +61,17 @@ public ObjectInspector initialize(ObjectInspector rowInspector) throws HiveExcep
fields = new StructField[names.length];
unionField = new boolean[names.length];
int unionIndex = -1;
-
for (int i = 0; i < names.length; i++) {
if (i == 0) {
inspectors[0] = (StructObjectInspector) rowInspector;
} else {
- if (unionIndex != -1) {
+ if (unionIndex == -1) {
+ inspectors[i] = (StructObjectInspector) fields[i - 1]
+ .getFieldObjectInspector();
+ } else {
inspectors[i] = (StructObjectInspector) (
(UnionObjectInspector)fields[i-1].getFieldObjectInspector()).
getObjectInspectors().get(unionIndex);
- } else {
- inspectors[i] = (StructObjectInspector) fields[i - 1]
- .getFieldObjectInspector();
}
}
// to support names like _colx:1._coly
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
index 70cbff8..e00b7d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
@@ -38,7 +38,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {
this.outputObjInspector = serde.getObjectInspector();
initializeChildren(hconf);
} catch (Exception e) {
- LOG.error("Generating output obj inspector from dummy object error");
+ LOG.error("Generating output obj inspector from dummy object error", e);
e.printStackTrace();
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 458d259..49a1834 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -196,7 +196,7 @@ public void setDone(boolean done) {
}
// non-bean fields needed during compilation
- private transient RowSchema rowSchema;
+ private RowSchema rowSchema;
public void setSchema(RowSchema rowSchema) {
this.rowSchema = rowSchema;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
index 234bbcd..1bfcee6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
@@ -51,8 +51,10 @@ public void setSignature(ArrayList signature) {
@Override
public String toString() {
StringBuilder sb = new StringBuilder('(');
- for (ColumnInfo col: signature) {
- sb.append(col.toString());
+ if (signature != null) {
+ for (ColumnInfo col : signature) {
+ sb.append(col.toString());
+ }
}
sb.append(')');
return sb.toString();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index edb55fa..7404a0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -27,6 +27,7 @@
import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
@@ -34,6 +35,8 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -76,9 +79,11 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.antlr.runtime.CommonToken;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
import org.apache.commons.logging.Log;
@@ -112,6 +117,9 @@
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -165,6 +173,11 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+
/**
* Utilities.
*
@@ -268,8 +281,19 @@ public static BaseWork getBaseWork(Configuration conf, String name) {
localPath = new Path(name);
}
InputStream in = new FileInputStream(localPath.toUri().getPath());
- BaseWork ret = deserializeObject(in);
- gWork = ret;
+ if(MAP_PLAN_NAME.equals(name)){
+ if("org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper".equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializeObject(in, MergeWork.class);
+ } else if("org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper".equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializeObject(in, ColumnTruncateWork.class);
+ } else if("org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper".equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializeObject(in, PartialScanWork.class);
+ } else {
+ gWork = deserializeObject(in, MapWork.class);
+ }
+ } else {
+ gWork = deserializeObject(in, ReduceWork.class);
+ }
gWorkMap.put(path, gWork);
}
return gWork;
@@ -586,11 +610,47 @@ protected void initialize(Class type, Object oldInstance, Object newInstance, En
}
}
- /**
- * Serialize the object. This helper function mainly makes sure that enums,
- * counters, etc are handled properly.
- */
- public static void serializeObject(Object plan, OutputStream out) {
+ private static class CommonTokenSerializer extends com.esotericsoftware.kryo.Serializer {
+
+ @Override
+ public CommonToken read(Kryo kryo, Input input, Class clazz) {
+ return new CommonToken(input.readInt(), input.readString());
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, CommonToken token) {
+ output.writeInt(token.getType());
+ output.writeString(token.getText());
+ }
+
+ }
+
+ /*
+ * This helper function is ONLY FOR DEBUGGING.
+ * It writes the plan in a file. It could be used to isolate any Kryo bug.
+ * Need to be removed before final review.
+ */
+ private static void createPlanXml(Object plan) {
+ try {
+ File fl = new File("/tmp/myplan.xml-" + counter.incrementAndGet());
+ // if (fl.exists() == false) {
+ LOG.info("Plan written to file : " + fl.getAbsolutePath() + " plan class :" + plan.getClass());
+ BufferedWriter bw = new BufferedWriter(new FileWriter(fl));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ createPlanXml(plan, baos);
+ bw.write(baos.toString());
+ bw.close();
+ // } else {
+ // LOG.info("File already exists "+ fl.getAbsolutePath());
+ // }
+ } catch (IOException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ }
+
+ private static void createPlanXml(Object plan, OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
e.setExceptionListener(new ExceptionListener() {
public void exceptionThrown(Exception e) {
@@ -613,19 +673,73 @@ public void exceptionThrown(Exception e) {
}
/**
+ * Serialize the object. This helper function mainly makes sure that enums,
+ * counters, etc are handled properly.
+ * @throws IOException
+ */
+ public static void serializeObject(Object plan, OutputStream out) throws IOException {
+ createPlanXml(plan); //For DEBUGGING ONLY.Comment before review.
+ com.esotericsoftware.minlog.Log.trace("START SERIALIZE");
+ Kryo kryo = new Kryo();
+ kryo.register(CommonToken.class, new CommonTokenSerializer());
+ Output output = new Output(out);
+ kryo.writeObject(output, plan);
+ output.close();
+ com.esotericsoftware.minlog.Log.trace("END SERIALIZE");
+ }
+
+ /**
* De-serialize an object. This helper function mainly makes sure that enums,
* counters, etc are handled properly.
*/
- @SuppressWarnings("unchecked")
- public static T deserializeObject(InputStream in) {
- XMLDecoder d = null;
+ public static T deserializeObject(InputStream in, Class clazz ) {
+ com.esotericsoftware.minlog.Log.trace("START DESERIALIZE");
+ Kryo kryo = new Kryo();
+ kryo.register(CommonToken.class, new CommonTokenSerializer());
+ Input inp = new Input(in);
+ T t = kryo.readObject(inp,clazz);
+ inp.close();
+ com.esotericsoftware.minlog.Log.trace("END DESERIALIZE");
+ return t;
+ }
+
+ public static void main(String[] args) throws Exception {
try {
- d = new XMLDecoder(in, null, null);
- return (T) d.readObject();
- } finally {
- if (null != d) {
- d.close();
+ com.esotericsoftware.minlog.Log.TRACE();
+ deserializeObject(new FileInputStream("/tmp/kryo-mapredwork-2"), MapredWork.class);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private static final AtomicInteger counter = new AtomicInteger(0);
+
+ public static MapredWork copyMRWork(MapredWork work) {
+ try {
+ com.esotericsoftware.minlog.Log.TRACE();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ serializeObject(work, baos);
+ byte[] bytes = baos.toByteArray();
+ FileOutputStream out = null;
+ try {
+ out = new FileOutputStream(new File("/tmp/kryo-mapredwork-" + counter.incrementAndGet()));
+ out.write(bytes);
+ } finally {
+ if(out != null) {
+ out.close();
+ }
+ }
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ try {
+ return deserializeObject(in, MapredWork.class);
+ } finally {
+ in.close();
}
+ } catch(IOException e) {
+ // exceedingly unlikely
+ throw new RuntimeException(e);
+ } finally {
+ com.esotericsoftware.minlog.Log.WARN();
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index eb9205b..2cc70f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -716,12 +716,12 @@ public static void main(String[] args) throws IOException, HiveException {
int ret;
if (localtask) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
- MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
+ MapredLocalWork plan = Utilities.deserializeObject(pathData, MapredLocalWork.class);
MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
ret = ed.executeFromChildJVM(new DriverContext());
} else {
- MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
+ MapredWork plan = Utilities.deserializeObject(pathData, MapredWork.class);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = ed.execute(new DriverContext());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index cd872b2..b0e6ef3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -35,10 +35,10 @@
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskHandle;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
import org.apache.hadoop.hive.ql.session.SessionState;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
index 49a0ee3..ff5f3d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
@@ -39,14 +39,11 @@
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.JobConf;
/**
* Extension of ExecDriver:
* - can optionally spawn a map-reduce task from a separate jvm
@@ -69,7 +66,6 @@
private transient ContentSummary inputSummary = null;
private transient boolean runningViaChild = false;
- private transient boolean inputSizeEstimated = false;
private transient long totalInputFileSize;
private transient long totalInputNumFiles;
@@ -79,10 +75,6 @@ public MapRedTask() {
super();
}
- public MapRedTask(MapredWork plan, JobConf job, boolean isSilent) throws HiveException {
- throw new RuntimeException("Illegal Constructor call");
- }
-
@Override
public int execute(DriverContext driverContext) {
@@ -408,7 +400,7 @@ private void setNumberOfReducers() throws IOException {
if (inputSummary == null) {
inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
}
- int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
+ int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
work.isFinalMapRed());
rWork.setNumReduceTasks(reducers);
console
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java
index c8354ff..296fecb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java
@@ -36,19 +36,19 @@
*/
public class BucketingSortingCtx implements NodeProcessorCtx {
- boolean disableBucketing;
+ private boolean disableBucketing;
// A mapping from an operator to the columns by which it's output is bucketed
- Map, List> bucketedColsByOp;
+ private Map, List> bucketedColsByOp;
// A mapping from a directory which a FileSinkOperator writes into to the columns by which that
// output is bucketed
- Map> bucketedColsByDirectory;
+ private Map> bucketedColsByDirectory;
// A mapping from an operator to the columns by which it's output is sorted
- Map, List> sortedColsByOp;
+ private Map, List> sortedColsByOp;
// A mapping from a directory which a FileSinkOperator writes into to the columns by which that
// output is sorted
- Map> sortedColsByDirectory;
+ private Map> sortedColsByDirectory;
public BucketingSortingCtx(boolean disableBucketing) {
this.disableBucketing = disableBucketing;
@@ -189,13 +189,18 @@ public String toString() {
* to be constant for all equivalent columns.
*/
public static final class SortCol implements BucketSortCol, Serializable {
+
+ public SortCol() {
+ super();
+ }
+
private static final long serialVersionUID = 1L;
// Equivalent aliases for the column
- private final List names = new ArrayList();
+ private List names = new ArrayList();
// Indexes of those equivalent columns
- private final List indexes = new ArrayList();
+ private List indexes = new ArrayList();
// Sort order (+|-)
- private final char sortOrder;
+ private char sortOrder;
public SortCol(String name, int index, char sortOrder) {
this(sortOrder);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
index 14a4b11..7af2a9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.optimizer.physical;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@@ -502,9 +500,10 @@ public static boolean cannotConvert(String bigTableAlias,
}
// deep copy a new mapred work from xml
// Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan
- String xml = currTask.getWork().toXML();
- InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeObject(in);
+
+ MapredWork newWork = Utilities.copyMRWork(currTask.getWork());
+ //MapredWork newWork = kryo.copy(currTask.getWork());
+
// create map join task and set big table as i
ObjectPair newTaskAlias = convertTaskToMapJoinTask(newWork, i);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index ee4d4d1..b6bbe5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
-import java.io.ByteArrayInputStream;
import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -241,16 +239,7 @@ public static void processSkewJoin(JoinOperator joinOp,
HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS);
newPlan.setMapperCannotSpanPartns(mapperCannotSpanPartns);
- MapredWork clonePlan = null;
- try {
- String xmlPlan = currPlan.toXML();
- StringBuilder sb = new StringBuilder(xmlPlan);
- ByteArrayInputStream bis;
- bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
- clonePlan = Utilities.deserializeObject(bis);
- } catch (UnsupportedEncodingException e) {
- throw new SemanticException(e);
- }
+ MapredWork clonePlan = Utilities.copyMRWork(currPlan);
Operator extends OperatorDesc>[] parentOps = new TableScanOperator[tags.length];
for (int k = 0; k < tags.length; k++) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
index 85a2bdf..c3077ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
@@ -163,7 +163,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object..
if (parent instanceof TableScanOperator) {
tbl = ((TableScanOperator) parent).getTableDesc();
} else {
- throw new SemanticException();
+ throw new SemanticException("Parent not TableScanOperator as expected. Please report.");
}
} else {
// get parent schema
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
index 35122c2..0265bbe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.optimizer.physical;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@@ -148,11 +146,8 @@ private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) {
private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOperator oldSMBJoinOp)
throws SemanticException {
try {
- String xml = currWork.toXML();
-
// deep copy a new mapred work
- InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork currJoinWork = Utilities.deserializeObject(in);
+ MapredWork currJoinWork = Utilities.copyMRWork(currWork);
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
// Add the row resolver for the new operator
@@ -169,14 +164,11 @@ private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOpera
}
// create map join task and set big table as bigTablePosition
- private ObjectPair convertSMBTaskToMapJoinTask(String xml,
+ private ObjectPair convertSMBTaskToMapJoinTask(MapredWork newWork,
int bigTablePosition,
SMBMapJoinOperator smbJoinOp,
QBJoinTree joinTree)
throws UnsupportedEncodingException, SemanticException {
- // deep copy a new mapred work from xml
- InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeObject(in);
// create a mapred task for this work
MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
.getParseContext().getConf());
@@ -240,7 +232,6 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp)
public Task extends Serializable> processCurrentTask(MapRedTask currTask,
ConditionalTask conditionalTask, Context context)
throws SemanticException {
-
// whether it contains a sort merge join operator
MapredWork currWork = currTask.getWork();
SMBMapJoinOperator originalSMBJoinOp = getSMBMapJoinOp(currWork);
@@ -290,7 +281,6 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp)
long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currJoinWork.getMapWork(),
pathToAliases, aliasToSize);
- String xml = currJoinWork.toXML();
long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf,
HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
@@ -302,7 +292,7 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp)
// create map join task for the given big table position
ObjectPair newTaskAlias =
- convertSMBTaskToMapJoinTask(xml, bigTablePosition, newSMBJoinOp, joinTree);
+ convertSMBTaskToMapJoinTask(Utilities.copyMRWork(currJoinWork), bigTablePosition, newSMBJoinOp, joinTree);
MapRedTask newTask = newTaskAlias.getFirst();
String bigTableAlias = newTaskAlias.getSecond();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
index 0aaf47e..9834e73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
@@ -51,22 +51,22 @@
// keeps track of the right-hand-side table name of the left-semi-join, and
// its list of join keys
- private final HashMap> rhsSemijoin;
+ private transient final HashMap> rhsSemijoin;
// join conditions
- private ArrayList> expressions;
+ private transient ArrayList> expressions;
// key index to nullsafe join flag
private ArrayList nullsafes;
// filters
- private ArrayList> filters;
+ private transient ArrayList> filters;
// outerjoin-pos = other-pos:filter-len, other-pos:filter-len, ...
private int[][] filterMap;
// filters for pushing
- private ArrayList> filtersForPushing;
+ private transient ArrayList> filtersForPushing;
// user asked for map-side join
private boolean mapSideJoin;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
index 85adbde..d177535 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
@@ -264,7 +264,11 @@ public boolean getIsExprResolver() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
-
+ // XXX this class is messed up on read and toString() fails which is called by kryo
+ // thus to debug the issue I have removed this call
+ /*if(true) {
+ return "RowResolver";
+ }*/
for (Map.Entry> e : rslvMap
.entrySet()) {
String tab = e.getKey();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index ff90bc8..0c82a9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -58,7 +58,7 @@
@SuppressWarnings({"serial", "deprecation"})
public class MapWork extends BaseWork {
- private static transient final Log LOG = LogFactory.getLog(MapWork.class);
+ private static final Log LOG = LogFactory.getLog(MapWork.class);
private boolean hadoopSupportsSplittable;
@@ -110,7 +110,7 @@
// used to indicate the input is sorted, and so a BinarySearchRecordReader shoudl be used
private boolean inputFormatSorted = false;
- private transient boolean useBucketizedHiveInputFormat;
+ private boolean useBucketizedHiveInputFormat;
public MapWork() {
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
index 7298cff..f3203bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
@@ -18,12 +18,10 @@
package org.apache.hadoop.hive.ql.plan;
-import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.mapred.JobConf;
@@ -82,10 +80,4 @@ public void configureJobConf(JobConf job) {
return ops;
}
-
- public String toXML() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Utilities.serializeObject(this, baos);
- return (baos.toString());
- }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
index 226d041..9d39ad1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
@@ -52,7 +52,7 @@
*/
boolean isMapSide = false;
- HiveConf cfg;
+ transient HiveConf cfg;
static{
PTFUtils.makeTransient(PTFDesc.class, "llInfo");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 5f983db..644f0c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -47,7 +47,7 @@
private java.util.Properties properties;
private String serdeClassName;
- private transient String baseFileName;
+ private String baseFileName;
public void setBaseFileName(String baseFileName) {
this.baseFileName = baseFileName;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java
index 4f28628..4a7caab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java
@@ -160,15 +160,15 @@ public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticE
*/
public static class GenericUDAFnGramEvaluator extends GenericUDAFEvaluator {
// For PARTIAL1 and COMPLETE: ObjectInspectors for original data
- private StandardListObjectInspector outerInputOI;
- private StandardListObjectInspector innerInputOI;
- private PrimitiveObjectInspector inputOI;
- private PrimitiveObjectInspector nOI;
- private PrimitiveObjectInspector kOI;
- private PrimitiveObjectInspector pOI;
+ private transient StandardListObjectInspector outerInputOI;
+ private transient StandardListObjectInspector innerInputOI;
+ private transient PrimitiveObjectInspector inputOI;
+ private transient PrimitiveObjectInspector nOI;
+ private transient PrimitiveObjectInspector kOI;
+ private transient PrimitiveObjectInspector pOI;
// For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations
- private StandardListObjectInspector loi;
+ private transient StandardListObjectInspector loi;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java
index 4176d3f..c429367 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java
@@ -38,8 +38,8 @@
@Description(name = "array",
value = "_FUNC_(n0, n1...) - Creates an array with the given elements ")
public class GenericUDFArray extends GenericUDF {
- private Converter[] converters;
- private ArrayList