diff --git eclipse-templates/.classpath eclipse-templates/.classpath
index 87ca207..d131007 100644
--- eclipse-templates/.classpath
+++ eclipse-templates/.classpath
@@ -50,6 +50,7 @@
+
@@ -93,15 +94,15 @@
-
-
-
+
+
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 8633321..1840eea 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDe;
@@ -68,6 +69,50 @@ public MapJoinOperator(AbstractMapJoinOperator extends MapJoinDesc> mjop) {
super(mjop);
}
+ /*
+ * We need the base (operator.java) implementation of start/endGroup.
+ * The parent class has functionality in those that map join can't use.
+ */
+ @Override
+ public void endGroup() throws HiveException {
+ LOG.debug("Ending group");
+
+ if (childOperators == null) {
+ return;
+ }
+
+ if (fatalError) {
+ return;
+ }
+
+ LOG.debug("Ending group for children:");
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.endGroup();
+ }
+
+ LOG.debug("End group Done");
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ LOG.debug("Starting group");
+
+ if (childOperators == null) {
+ return;
+ }
+
+ if (fatalError) {
+ return;
+ }
+
+ LOG.debug("Starting group for children:");
+ for (Operator extends OperatorDesc> op : childOperators) {
+ op.startGroup();
+ }
+
+ LOG.debug("Start group Done");
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
@@ -126,7 +171,8 @@ public void generateMapMetaData() throws HiveException, SerDeException {
private void loadHashTable() throws HiveException {
- if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+ if (this.getExecContext().getLocalWork() == null
+ || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
if (hashTblInitedOnce) {
return;
} else {
@@ -159,8 +205,8 @@ public void cleanUpInputFileChangedOp() throws HiveException {
public void processOp(Object row, int tag) throws HiveException {
try {
if (firstRow) {
- // generate the map metadata
generateMapMetaData();
+ loadHashTable();
firstRow = false;
}
alias = (byte)tag;
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
index b2c5ddc..4bc85ab 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
@@ -80,7 +80,7 @@ public boolean equals(Object obj) {
return true;
}
@SuppressWarnings("unchecked")
- public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
+ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
throws IOException, SerDeException {
SerDe serde = context.getSerDe();
container.readFields(in);
@@ -92,8 +92,21 @@ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writab
key = value.toArray();
}
}
-
- public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
+
+ @SuppressWarnings("unchecked")
+ public void read(MapJoinObjectSerDeContext context, Writable container) throws SerDeException {
+ SerDe serde = context.getSerDe();
+ List