Index: contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java
===================================================================
--- contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java (revision 1520407)
+++ contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFCount2.java (working copy)
@@ -36,8 +36,8 @@
*/
public class GenericUDTFCount2 extends GenericUDTF {
- Integer count = Integer.valueOf(0);
- Object forwardObj[] = new Object[1];
+ private transient Integer count = Integer.valueOf(0);
+ private transient Object forwardObj[] = new Object[1];
@Override
public void close() throws HiveException {
Index: contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java
===================================================================
--- contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java (revision 1520407)
+++ contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java (working copy)
@@ -67,7 +67,7 @@
fieldOIs);
}
- Object forwardObj[] = new Object[2];
+ private transient Object forwardObj[] = new Object[2];
@Override
public void process(Object[] o) throws HiveException {
Index: hcatalog/pom.xml
===================================================================
--- hcatalog/pom.xml (revision 1520407)
+++ hcatalog/pom.xml (working copy)
@@ -215,7 +215,16 @@
false
-
+
+ sonatype-snapshots
+ https://oss.sonatype.org/content/repositories/snapshots/
+
+ false
+
+
+ true
+
+
Index: ivy/ivysettings.xml
===================================================================
--- ivy/ivysettings.xml (revision 1520407)
+++ ivy/ivysettings.xml (working copy)
@@ -47,6 +47,10 @@
checkmodified="${ivy.checkmodified}"
changingPattern="${ivy.changingPattern}"/>
+
+
@@ -68,6 +72,7 @@
+
@@ -77,11 +82,13 @@
+
+
Index: ivy/libraries.properties
===================================================================
--- ivy/libraries.properties (revision 1520407)
+++ ivy/libraries.properties (working copy)
@@ -51,6 +51,7 @@
jline.version=0.9.94
json.version=20090211
junit.version=4.10
+kryo.version=2.22-SNAPSHOT
libfb303.version=0.9.0
libthrift.version=0.9.0
log4j.version=1.2.16
Index: ql/build.xml
===================================================================
--- ql/build.xml (revision 1520407)
+++ ql/build.xml (working copy)
@@ -250,6 +250,15 @@
+
+
+
+
+
+
+
@@ -298,7 +307,9 @@
-
+
+
+
Index: ql/ivy.xml
===================================================================
--- ql/ivy.xml (revision 1520407)
+++ ql/ivy.xml (working copy)
@@ -46,6 +46,8 @@
rev="${protobuf.version}" transitive="false"/>
+
Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy)
@@ -466,7 +466,7 @@
// deserialize the queryPlan
FileInputStream fis = new FileInputStream(queryPlanFileName);
- QueryPlan newPlan = Utilities.deserializeObject(fis);
+ QueryPlan newPlan = Utilities.deserializeObject(fis, QueryPlan.class);
fis.close();
// Use the deserialized plan
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (working copy)
@@ -54,7 +54,7 @@
*/
private boolean isVirtualCol;
- private transient ObjectInspector objectInspector;
+ private ObjectInspector objectInspector;
private boolean isHiddenVirtualCol;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (working copy)
@@ -31,12 +31,12 @@
*/
public class ExprNodeColumnEvaluator extends ExprNodeEvaluator {
- transient boolean simpleCase;
- transient StructObjectInspector inspector;
- transient StructField field;
- transient StructObjectInspector[] inspectors;
- transient StructField[] fields;
- transient boolean[] unionField;
+ private transient boolean simpleCase;
+ private transient StructObjectInspector inspector;
+ private transient StructField field;
+ private transient StructObjectInspector[] inspectors;
+ private transient StructField[] fields;
+ private transient boolean[] unionField;
public ExprNodeColumnEvaluator(ExprNodeColumnDesc expr) {
super(expr);
@@ -61,18 +61,17 @@
fields = new StructField[names.length];
unionField = new boolean[names.length];
int unionIndex = -1;
-
for (int i = 0; i < names.length; i++) {
if (i == 0) {
inspectors[0] = (StructObjectInspector) rowInspector;
} else {
- if (unionIndex != -1) {
+ if (unionIndex == -1) {
+ inspectors[i] = (StructObjectInspector) fields[i - 1]
+ .getFieldObjectInspector();
+ } else {
inspectors[i] = (StructObjectInspector) (
(UnionObjectInspector)fields[i-1].getFieldObjectInspector()).
getObjectInspectors().get(unionIndex);
- } else {
- inspectors[i] = (StructObjectInspector) fields[i - 1]
- .getFieldObjectInspector();
}
}
// to support names like _colx:1._coly
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy)
@@ -478,7 +478,7 @@
Class extends UDF> UDFClass, boolean isOperator, String displayName) {
if (UDF.class.isAssignableFrom(UDFClass)) {
FunctionInfo fI = new FunctionInfo(isNative, displayName,
- new GenericUDFBridge(displayName, isOperator, UDFClass));
+ new GenericUDFBridge(displayName, isOperator, UDFClass.getName()));
mFunctions.put(functionName.toLowerCase(), fI);
} else {
throw new RuntimeException("Registering UDF Class " + UDFClass
@@ -1107,7 +1107,7 @@
if (genericUDF instanceof GenericUDFBridge) {
GenericUDFBridge bridge = (GenericUDFBridge) genericUDF;
return new GenericUDFBridge(bridge.getUdfName(), bridge.isOperator(),
- bridge.getUdfClass());
+ bridge.getUdfClassName());
} else if (genericUDF instanceof GenericUDFMacro) {
GenericUDFMacro bridge = (GenericUDFMacro) genericUDF;
return new GenericUDFMacro(bridge.getMacroName(), bridge.getBody(),
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java (working copy)
@@ -38,7 +38,7 @@
this.outputObjInspector = serde.getObjectInspector();
initializeChildren(hconf);
} catch (Exception e) {
- LOG.error("Generating output obj inspector from dummy object error");
+ LOG.error("Generating output obj inspector from dummy object error", e);
e.printStackTrace();
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy)
@@ -58,7 +58,7 @@
private static final long serialVersionUID = 1L;
- private Configuration configuration;
+ private transient Configuration configuration;
protected List> childOperators;
protected List> parentOperators;
protected String operatorId;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (working copy)
@@ -51,8 +51,10 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder('(');
- for (ColumnInfo col: signature) {
- sb.append(col.toString());
+ if (signature != null) {
+ for (ColumnInfo col : signature) {
+ sb.append(col.toString());
+ }
}
sb.append(')');
return sb.toString();
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -27,6 +27,7 @@
import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
@@ -34,6 +35,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -76,9 +78,11 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.antlr.runtime.CommonToken;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
import org.apache.commons.logging.Log;
@@ -112,6 +116,9 @@
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -165,6 +172,11 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+
/**
* Utilities.
*
@@ -268,8 +280,19 @@
localPath = new Path(name);
}
InputStream in = new FileInputStream(localPath.toUri().getPath());
- BaseWork ret = deserializeObject(in);
- gWork = ret;
+ if(MAP_PLAN_NAME.equals(name)){
+ if("org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper".equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializeObject(in, MergeWork.class);
+ } else if("org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper".equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializeObject(in, ColumnTruncateWork.class);
+ } else if("org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper".equals(conf.get("mapred.mapper.class"))) {
+ gWork = deserializeObject(in, PartialScanWork.class);
+ } else {
+ gWork = deserializeObject(in, MapWork.class);
+ }
+ } else {
+ gWork = deserializeObject(in, ReduceWork.class);
+ }
gWorkMap.put(path, gWork);
}
return gWork;
@@ -586,11 +609,61 @@
}
}
- /**
- * Serialize the object. This helper function mainly makes sure that enums,
- * counters, etc are handled properly.
- */
- public static void serializeObject(Object plan, OutputStream out) {
+ private static class CommonTokenSerializer extends com.esotericsoftware.kryo.Serializer {
+
+ @Override
+ public CommonToken read(Kryo kryo, Input input, Class clazz) {
+ return new CommonToken(input.readInt(), input.readString());
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, CommonToken token) {
+ output.writeInt(token.getType());
+ output.writeString(token.getText());
+ }
+
+ }
+
+ private static class SqlDateSerializer extends com.esotericsoftware.kryo.Serializer {
+
+ @Override
+ public java.sql.Date read(Kryo kryo, Input input, Class clazz) {
+ return new java.sql.Date(input.readLong());
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, java.sql.Date sqlDate) {
+ output.writeLong(sqlDate.getTime());
+ }
+
+ }
+
+ /*
+ * This helper function is ONLY FOR DEBUGGING.
+ * It writes the plan in a file. It could be used to isolate any Kryo bug.
+ * Need to be removed before final review.
+ */
+ private static void createPlanXml(Object plan) {
+ try {
+ File fl = new File("/tmp/myplan.xml-" + counter.incrementAndGet());
+ // if (fl.exists() == false) {
+ LOG.info("Plan written to file : " + fl.getAbsolutePath() + " plan class :" + plan.getClass());
+ BufferedWriter bw = new BufferedWriter(new FileWriter(fl));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ createPlanXml(plan, baos);
+ bw.write(baos.toString());
+ bw.close();
+ // } else {
+ // LOG.info("File already exists "+ fl.getAbsolutePath());
+ // }
+ } catch (IOException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+
+ }
+
+ private static void createPlanXml(Object plan, OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
e.setExceptionListener(new ExceptionListener() {
public void exceptionThrown(Exception e) {
@@ -613,22 +686,68 @@
}
/**
+ * Serialize the object. This helper function mainly makes sure that enums,
+ * counters, etc are handled properly.
+ * @throws IOException
+ */
+ public static void serializeObject(Object plan, OutputStream out) throws IOException {
+ createPlanXml(plan); //For DEBUGGING ONLY.Comment before review.
+ com.esotericsoftware.minlog.Log.trace("START SERIALIZE");
+ Kryo kryo = getKryo();
+ /*Kryo kryo = new Kryo();
+ //kryo.register(CommonToken.class, new CommonTokenSerializer());
+ kryo.register(java.sql.Date.class, new SqlDateSerializer());*/
+ Output output = new Output(out);
+ kryo.writeObject(output, plan);
+ output.close();
+ com.esotericsoftware.minlog.Log.trace("END SERIALIZE");
+ }
+
+ /**
* De-serialize an object. This helper function mainly makes sure that enums,
* counters, etc are handled properly.
*/
- @SuppressWarnings("unchecked")
- public static T deserializeObject(InputStream in) {
- XMLDecoder d = null;
+ public static T deserializeObject(InputStream in, Class clazz ) {
+ com.esotericsoftware.minlog.Log.trace("START DESERIALIZE");
+ Kryo kryo = getKryo();
+ /*Kryo kryo = new Kryo();
+ //kryo.register(CommonToken.class, new CommonTokenSerializer());
+ kryo.register(java.sql.Date.class, new SqlDateSerializer());*/
+ Input inp = new Input(in);
+ T t = kryo.readObject(inp,clazz);
+ inp.close();
+ com.esotericsoftware.minlog.Log.trace("END DESERIALIZE");
+ return t;
+ }
+
+//TODO: Ensure to reuse the same instance of Kryo. There is overhead to create a Kryo object.
+
+ protected static Kryo getKryo() {
+ Kryo kryo = new Kryo();
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ //kryo.register(CommonToken.class, new CommonTokenSerializer());
+ kryo.register(java.sql.Date.class, new SqlDateSerializer());
+ return kryo;
+ }
+
+ public static void main(String[] args) throws Exception {
try {
- d = new XMLDecoder(in, null, null);
- return (T) d.readObject();
- } finally {
- if (null != d) {
- d.close();
- }
+ com.esotericsoftware.minlog.Log.TRACE();
+ deserializeObject(new FileInputStream("/tmp/kryo-mapredwork-2"), MapredWork.class);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
}
}
+ private static final AtomicInteger counter = new AtomicInteger(0);
+
+ public static MapredWork copyMRWork(MapredWork work) {
+ //com.esotericsoftware.minlog.Log.TRACE();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ createPlanXml(work, baos);
+ return (MapredWork)(new XMLDecoder(new ByteArrayInputStream(baos.toByteArray()),null,null).readObject());
+ }
+
public static TableDesc defaultTd;
static {
// by default we expect ^A separated strings
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (working copy)
@@ -716,12 +716,12 @@
int ret;
if (localtask) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
- MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
+ MapredLocalWork plan = Utilities.deserializeObject(pathData, MapredLocalWork.class);
MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
ret = ed.executeFromChildJVM(new DriverContext());
} else {
- MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
+ MapredWork plan = Utilities.deserializeObject(pathData, MapredWork.class);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = ed.execute(new DriverContext());
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (working copy)
@@ -35,10 +35,10 @@
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskHandle;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
import org.apache.hadoop.hive.ql.session.SessionState;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (working copy)
@@ -39,14 +39,11 @@
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.JobConf;
/**
* Extension of ExecDriver:
* - can optionally spawn a map-reduce task from a separate jvm
@@ -69,7 +66,6 @@
private transient ContentSummary inputSummary = null;
private transient boolean runningViaChild = false;
- private transient boolean inputSizeEstimated = false;
private transient long totalInputFileSize;
private transient long totalInputNumFiles;
@@ -79,10 +75,6 @@
super();
}
- public MapRedTask(MapredWork plan, JobConf job, boolean isSilent) throws HiveException {
- throw new RuntimeException("Illegal Constructor call");
- }
-
@Override
public int execute(DriverContext driverContext) {
@@ -408,7 +400,7 @@
if (inputSummary == null) {
inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
}
- int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
+ int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
work.isFinalMapRed());
rWork.setNumReduceTasks(reducers);
console
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingCtx.java (working copy)
@@ -36,19 +36,19 @@
*/
public class BucketingSortingCtx implements NodeProcessorCtx {
- boolean disableBucketing;
+ private boolean disableBucketing;
// A mapping from an operator to the columns by which it's output is bucketed
- Map, List> bucketedColsByOp;
+ private Map, List> bucketedColsByOp;
// A mapping from a directory which a FileSinkOperator writes into to the columns by which that
// output is bucketed
- Map> bucketedColsByDirectory;
+ private Map> bucketedColsByDirectory;
// A mapping from an operator to the columns by which it's output is sorted
- Map, List> sortedColsByOp;
+ private Map, List> sortedColsByOp;
// A mapping from a directory which a FileSinkOperator writes into to the columns by which that
// output is sorted
- Map> sortedColsByDirectory;
+ private Map> sortedColsByDirectory;
public BucketingSortingCtx(boolean disableBucketing) {
this.disableBucketing = disableBucketing;
@@ -189,13 +189,18 @@
* to be constant for all equivalent columns.
*/
public static final class SortCol implements BucketSortCol, Serializable {
+
+ public SortCol() {
+ super();
+ }
+
private static final long serialVersionUID = 1L;
// Equivalent aliases for the column
- private final List names = new ArrayList();
+ private List names = new ArrayList();
// Indexes of those equivalent columns
- private final List indexes = new ArrayList();
+ private List indexes = new ArrayList();
// Sort order (+|-)
- private final char sortOrder;
+ private char sortOrder;
public SortCol(String name, int index, char sortOrder) {
this(sortOrder);
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (working copy)
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.optimizer.physical;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@@ -502,10 +500,11 @@
}
// deep copy a new mapred work from xml
// Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan
- String xml = currTask.getWork().toXML();
- InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeObject(in);
+ MapredWork newWork = Utilities.copyMRWork(currTask.getWork());
+ //MapredWork newWork = kryo.copy(currTask.getWork());
+
+
// create map join task and set big table as i
ObjectPair newTaskAlias = convertTaskToMapJoinTask(newWork, i);
MapRedTask newTask = newTaskAlias.getFirst();
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (working copy)
@@ -18,9 +18,7 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
-import java.io.ByteArrayInputStream;
import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -241,16 +239,7 @@
HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS);
newPlan.setMapperCannotSpanPartns(mapperCannotSpanPartns);
- MapredWork clonePlan = null;
- try {
- String xmlPlan = currPlan.toXML();
- StringBuilder sb = new StringBuilder(xmlPlan);
- ByteArrayInputStream bis;
- bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
- clonePlan = Utilities.deserializeObject(bis);
- } catch (UnsupportedEncodingException e) {
- throw new SemanticException(e);
- }
+ MapredWork clonePlan = Utilities.copyMRWork(currPlan);
Operator extends OperatorDesc>[] parentOps = new TableScanOperator[tags.length];
for (int k = 0; k < tags.length; k++) {
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (working copy)
@@ -163,7 +163,7 @@
if (parent instanceof TableScanOperator) {
tbl = ((TableScanOperator) parent).getTableDesc();
} else {
- throw new SemanticException();
+ throw new SemanticException("Parent not TableScanOperator as expected. Please report.");
}
} else {
// get parent schema
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (working copy)
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.ql.optimizer.physical;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@@ -148,11 +146,8 @@
private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOperator oldSMBJoinOp)
throws SemanticException {
try {
- String xml = currWork.toXML();
-
// deep copy a new mapred work
- InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork currJoinWork = Utilities.deserializeObject(in);
+ MapredWork currJoinWork = Utilities.copyMRWork(currWork);
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
// Add the row resolver for the new operator
@@ -169,14 +164,11 @@
}
// create map join task and set big table as bigTablePosition
- private ObjectPair convertSMBTaskToMapJoinTask(String xml,
+ private ObjectPair convertSMBTaskToMapJoinTask(MapredWork newWork,
int bigTablePosition,
SMBMapJoinOperator smbJoinOp,
QBJoinTree joinTree)
throws UnsupportedEncodingException, SemanticException {
- // deep copy a new mapred work from xml
- InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeObject(in);
// create a mapred task for this work
MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
.getParseContext().getConf());
@@ -240,7 +232,6 @@
public Task extends Serializable> processCurrentTask(MapRedTask currTask,
ConditionalTask conditionalTask, Context context)
throws SemanticException {
-
// whether it contains a sort merge join operator
MapredWork currWork = currTask.getWork();
SMBMapJoinOperator originalSMBJoinOp = getSMBMapJoinOp(currWork);
@@ -290,7 +281,6 @@
long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currJoinWork.getMapWork(),
pathToAliases, aliasToSize);
- String xml = currJoinWork.toXML();
long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf,
HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
@@ -302,7 +292,7 @@
// create map join task for the given big table position
ObjectPair newTaskAlias =
- convertSMBTaskToMapJoinTask(xml, bigTablePosition, newSMBJoinOp, joinTree);
+ convertSMBTaskToMapJoinTask(Utilities.copyMRWork(currJoinWork), bigTablePosition, newSMBJoinOp, joinTree);
MapRedTask newTask = newTaskAlias.getFirst();
String bigTableAlias = newTaskAlias.getSecond();
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java (working copy)
@@ -51,22 +51,22 @@
// keeps track of the right-hand-side table name of the left-semi-join, and
// its list of join keys
- private final HashMap> rhsSemijoin;
+ private transient final HashMap> rhsSemijoin;
// join conditions
- private ArrayList> expressions;
+ private transient ArrayList> expressions;
// key index to nullsafe join flag
private ArrayList nullsafes;
// filters
- private ArrayList> filters;
+ private transient ArrayList> filters;
// outerjoin-pos = other-pos:filter-len, other-pos:filter-len, ...
private int[][] filterMap;
// filters for pushing
- private ArrayList> filtersForPushing;
+ private transient ArrayList> filtersForPushing;
// user asked for map-side join
private boolean mapSideJoin;
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (working copy)
@@ -264,7 +264,11 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
-
+ // XXX this class is messed up on read and toString() fails which is called by kryo
+ // thus to debug the issue I have removed this call
+ /*if(true) {
+ return "RowResolver";
+ }*/
for (Map.Entry> e : rslvMap
.entrySet()) {
String tab = e.getKey();
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java (working copy)
@@ -249,7 +249,7 @@
if (genericUDF instanceof GenericUDFBridge) {
GenericUDFBridge bridge = (GenericUDFBridge) genericUDF;
GenericUDFBridge bridge2 = (GenericUDFBridge) dest.getGenericUDF();
- if (!bridge.getUdfClass().equals(bridge2.getUdfClass())
+ if (!bridge.getUdfClassName().equals(bridge2.getUdfClassName())
|| !bridge.getUdfName().equals(bridge2.getUdfName())
|| bridge.isOperator() != bridge2.isOperator()) {
return false;
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (working copy)
@@ -52,13 +52,13 @@
* distributed on the cluster. The ExecMapper will ultimately deserialize this
* class on the data nodes and setup it's operator pipeline accordingly.
*
- * This class is also used in the explain command any property with the
+ * This class is also used in the explain command any property with the
* appropriate annotation will be displayed in the explain output.
*/
@SuppressWarnings({"serial", "deprecation"})
public class MapWork extends BaseWork {
- private static transient final Log LOG = LogFactory.getLog(MapWork.class);
+ private static final Log LOG = LogFactory.getLog(MapWork.class);
private boolean hadoopSupportsSplittable;
@@ -102,15 +102,15 @@
public static final int SAMPLING_ON_START = 2; // sampling on task running
// the following two are used for join processing
- private QBJoinTree joinTree;
- private LinkedHashMap, OpParseContext> opParseCtxMap;
+ private transient QBJoinTree joinTree;
+ private transient LinkedHashMap, OpParseContext> opParseCtxMap;
private boolean mapperCannotSpanPartns;
// used to indicate the input is sorted, and so a BinarySearchRecordReader shoudl be used
private boolean inputFormatSorted = false;
- private transient boolean useBucketizedHiveInputFormat;
+ private boolean useBucketizedHiveInputFormat;
public MapWork() {
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy)
@@ -18,12 +18,10 @@
package org.apache.hadoop.hive.ql.plan;
-import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.mapred.JobConf;
@@ -82,10 +80,4 @@
return ops;
}
-
- public String toXML() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Utilities.serializeObject(this, baos);
- return (baos.toString());
- }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java (working copy)
@@ -52,7 +52,7 @@
*/
boolean isMapSide = false;
- HiveConf cfg;
+ transient HiveConf cfg;
static{
PTFUtils.makeTransient(PTFDesc.class, "llInfo");
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java (working copy)
@@ -47,7 +47,7 @@
private java.util.Properties properties;
private String serdeClassName;
- private transient String baseFileName;
+ private String baseFileName;
public void setBaseFileName(String baseFileName) {
this.baseFileName = baseFileName;
Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFnGrams.java (working copy)
@@ -160,15 +160,15 @@
*/
public static class GenericUDAFnGramEvaluator extends GenericUDAFEvaluator {
// For PARTIAL1 and COMPLETE: ObjectInspectors for original data
- private StandardListObjectInspector outerInputOI;
- private StandardListObjectInspector innerInputOI;
- private PrimitiveObjectInspector inputOI;
- private PrimitiveObjectInspector nOI;
- private PrimitiveObjectInspector kOI;
- private PrimitiveObjectInspector pOI;
+ private transient StandardListObjectInspector outerInputOI;
+ private transient StandardListObjectInspector innerInputOI;
+ private transient PrimitiveObjectInspector inputOI;
+ private transient PrimitiveObjectInspector nOI;
+ private transient PrimitiveObjectInspector kOI;
+ private transient PrimitiveObjectInspector pOI;
// For PARTIAL2 and FINAL: ObjectInspectors for partial aggregations
- private StandardListObjectInspector loi;
+ private transient StandardListObjectInspector loi;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
Index: ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java (revision 1520407)
+++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFArray.java (working copy)
@@ -38,8 +38,8 @@
@Description(name = "array",
value = "_FUNC_(n0, n1...) - Creates an array with the given elements ")
public class GenericUDFArray extends GenericUDF {
- private Converter[] converters;
- private ArrayList