diff --git eclipse-templates/.classpath eclipse-templates/.classpath
index 87ca207..d131007 100644
--- eclipse-templates/.classpath
+++ eclipse-templates/.classpath
@@ -50,6 +50,7 @@
+
@@ -93,15 +94,15 @@
-
-
-
+
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 8633321..b077c39 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDe;
@@ -69,6 +70,46 @@ public MapJoinOperator(AbstractMapJoinOperator extends MapJoinDesc> mjop) {
}
@Override
+ public void endGroup() throws HiveException {
+ LOG.debug("Ending group");
+
+ if (childOperators == null) {
+ return;
+ }
+
+ if (fatalError) {
+ return;
+ }
+
+ LOG.debug("Ending group for children:");
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.endGroup();
+ }
+
+ LOG.debug("End group Done");
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ LOG.debug("Starting group");
+
+ if (childOperators == null) {
+ return;
+ }
+
+ if (fatalError) {
+ return;
+ }
+
+ LOG.debug("Starting group for children:");
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.startGroup();
+ }
+
+ LOG.debug("Start group Done");
+ }
+
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
@@ -126,7 +167,8 @@ public void generateMapMetaData() throws HiveException, SerDeException {
private void loadHashTable() throws HiveException {
- if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+ if (this.getExecContext().getLocalWork() == null
+ || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
if (hashTblInitedOnce) {
return;
} else {
@@ -159,8 +201,8 @@ public void cleanUpInputFileChangedOp() throws HiveException {
public void processOp(Object row, int tag) throws HiveException {
try {
if (firstRow) {
- // generate the map metadata
generateMapMetaData();
+ loadHashTable();
firstRow = false;
}
alias = (byte)tag;
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
index b2c5ddc..3700325 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
@@ -36,7 +36,7 @@
private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
private Object[] key;
-
+
public MapJoinKey(Object[] key) {
this.key = key;
}
@@ -57,8 +57,8 @@ public boolean hasAnyNulls(boolean[] nullsafes){
}
return false;
}
-
-
+
+
@Override
public int hashCode() {
final int prime = 31;
@@ -68,19 +68,24 @@ public int hashCode() {
}
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
MapJoinKey other = (MapJoinKey) obj;
- if (!Arrays.equals(key, other.key))
+ if (!Arrays.equals(key, other.key)) {
return false;
+ }
return true;
}
+
@SuppressWarnings("unchecked")
- public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
+ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
throws IOException, SerDeException {
SerDe serde = context.getSerDe();
container.readFields(in);
@@ -92,8 +97,21 @@ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writab
key = value.toArray();
}
}
-
- public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
+
+ @SuppressWarnings("unchecked")
+ public void read(MapJoinObjectSerDeContext context, Writable container) throws SerDeException {
+ SerDe serde = context.getSerDe();
+ List