diff --git src/docs/src/documentation/content/xdocs/basic.xml src/docs/src/documentation/content/xdocs/basic.xml
index 9892ff5e3..9f14164fa 100644
--- src/docs/src/documentation/content/xdocs/basic.xml
+++ src/docs/src/documentation/content/xdocs/basic.xml
@@ -1225,6 +1225,20 @@ dump X;
(joe,18,2.5,joe,18,2.5)
+
+ Nulls and FLATTEN Operator
+ The FLATTEN operator handles null value differently based on its schema.
+ For null tuples, FLATTEN(null) produces multiples nulls based on the number of elements in the schema for that field.
+ If tuple has no schema, FLATTEN(null) simply returns a single null.
+ For null bags, we would have liked to discard the row just like we do with flatten of an empty bag.
+ However, it was too late by the time we noticed this inconsistency.
+ In order to preserve the backward compatibility, FLATTEN(null) for bag produces multiples nulls
+ based on the number of elements defined for the schema of this bag.
+ If no schema, a single null is returned.
+ For bags containing some null tuples, it follows the same rule as flatten of null tuples described above.
+ For null maps, FLATTEN(null) produces 2 nulls to represent the key and the value.
+ For null with other types, FLATTEN(null) simply returns a single null.
+
@@ -5453,8 +5467,11 @@ D = foreach C generate A::y, z; -- Cannot simply refer to y as it can refer to A
we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
+ For other types, flatten becomes a no-op and simply returns the passed value.
+
Also note that the flatten of empty bag will result in that row being discarded; no output is generated.
(See also Drop Nulls Before a Join .)
+ As for flatten with null values, see Nulls and FLATTEN operator .
grunt> cat empty.bag
diff --git src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
index aba51319c..e9fa5a610 100644
--- src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
+++ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
@@ -74,6 +74,8 @@ public class POForEach extends PhysicalOperator {
// so we can also save on the Boolean.booleanValue() calls
protected boolean[] isToBeFlattenedArray;
+ private int[] flattenNumFields = null;
+
protected int noItems;
//Since the plan has a generate, this needs to be maintained
@@ -446,6 +448,8 @@ public class POForEach extends PhysicalOperator {
} else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
its[i] = ((Map)bags[i]).entrySet().iterator();
} else {
+ // This includes FLATTEN(null) for bag and map
+ // in addition to non-null values from other data types
its[i] = null;
}
}
@@ -490,7 +494,6 @@ public class POForEach extends PhysicalOperator {
if(getReporter()!=null) {
getReporter().progress();
}
- //createTuple(data);
res.result = createTuple(data);
res.returnStatus = POStatus.STATUS_OK;
return res;
@@ -538,17 +541,23 @@ public class POForEach extends PhysicalOperator {
for(int i = 0; i < data.length; ++i) {
Object in = data[i];
- if(isToBeFlattenedArray[i] && in instanceof Tuple) {
+ if(!isToBeFlattenedArray[i]) {
+ if (knownSize) {
+ out.set(idx++, in);
+ } else {
+ out.append(in);
+ }
+ } else if(in instanceof Tuple) {
Tuple t = (Tuple)in;
int size = t.size();
for(int j = 0; j < size; ++j) {
if (knownSize) {
out.set(idx++, t.get(j));
} else {
- out.append(t.get(j));
- }
+ out.append(t.get(j));
+ }
}
- } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
+ } else if (in instanceof Map.Entry) {
Map.Entry entry = (Map.Entry)in;
if (knownSize) {
out.set(idx++, entry.getKey());
@@ -557,14 +566,25 @@ public class POForEach extends PhysicalOperator {
out.append(entry.getKey());
out.append(entry.getValue());
}
+ } else if (in == null &&
+ flattenNumFields != null && flattenNumFields[i] != 0 ) {
+ // Handling of FLATTEN(null) here.
+ // Expanding to multiple nulls depending on the schema
+ for( int j = 0; j < flattenNumFields[i]; j++ ) {
+ if (knownSize) {
+ out.set(idx++, null);
+ } else {
+ out.append(null);
+ }
+ }
} else {
if (knownSize) {
out.set(idx++, in);
- } else {
- out.append(in);
+ } else {
+ out.append(in);
+ }
}
}
- }
if (inpTuple != null) {
return illustratorMarkup(inpTuple, out, 0);
} else {
@@ -706,6 +726,7 @@ public class POForEach extends PhysicalOperator {
clone.addOriginalLocation(alias, getOriginalLocations());
clone.endOfAllInputProcessing = endOfAllInputProcessing;
clone.mapSideOnly = mapSideOnly;
+ clone.flattenNumFields = flattenNumFields;
return clone;
}
@@ -714,6 +735,10 @@ public class POForEach extends PhysicalOperator {
return processingPlan;
}
+ public void setFlattenNumFields (int [] flattenNumFields) {
+ this.flattenNumFields = flattenNumFields;
+ }
+
protected void setUpFlattens(List isToBeFlattened) {
if(isToBeFlattened == null) {
isToBeFlattenedArray = null;
diff --git src/org/apache/pig/impl/util/TupleFormat.java src/org/apache/pig/impl/util/TupleFormat.java
index ad76b52e6..1a0066369 100644
--- src/org/apache/pig/impl/util/TupleFormat.java
+++ src/org/apache/pig/impl/util/TupleFormat.java
@@ -47,32 +47,34 @@ public class TupleFormat {
public static String format(Tuple tuple) {
StringBuilder sb = new StringBuilder();
sb.append('(');
- for (int i = 0; i < tuple.size(); ++i) {
- try {
- Object d = tuple.get(i);
- if (d != null) {
- if (d instanceof Map) {
- sb.append(DataType.mapToString((Map) d));
- } else if (d instanceof Tuple) {
- Tuple t = (Tuple) d;
- sb.append(TupleFormat.format(t));
- } else if (d instanceof DataBag){
- DataBag bag=(DataBag)d;
- sb.append(BagFormat.format(bag));
+ if( tuple != null ) {
+ for (int i = 0; i < tuple.size(); ++i) {
+ try {
+ Object d = tuple.get(i);
+ if (d != null) {
+ if (d instanceof Map) {
+ sb.append(DataType.mapToString((Map) d));
+ } else if (d instanceof Tuple) {
+ Tuple t = (Tuple) d;
+ sb.append(TupleFormat.format(t));
+ } else if (d instanceof DataBag){
+ DataBag bag=(DataBag)d;
+ sb.append(BagFormat.format(bag));
+ }
+ else {
+ sb.append(d.toString());
+ }
+ } else {
+ sb.append("");
}
- else {
- sb.append(d.toString());
- }
- } else {
- sb.append("");
+ if (i != tuple.size() - 1)
+ sb.append(",");
+ } catch (ExecException e) {
+ e.printStackTrace();
+ mLog.warn("Exception when format tuple", e);
}
- if (i != tuple.size() - 1)
- sb.append(",");
- } catch (ExecException e) {
- e.printStackTrace();
- mLog.warn("Exception when format tuple", e);
- }
+ }
}
sb.append(')');
return sb.toString();
diff --git src/org/apache/pig/newplan/logical/relational/LOGenerate.java src/org/apache/pig/newplan/logical/relational/LOGenerate.java
index 2609c2dfe..13f5b387b 100644
--- src/org/apache/pig/newplan/logical/relational/LOGenerate.java
+++ src/org/apache/pig/newplan/logical/relational/LOGenerate.java
@@ -33,6 +33,7 @@ import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchem
public class LOGenerate extends LogicalRelationalOperator {
private List outputPlans;
private boolean[] flattenFlags;
+ private int[] flattenNumFields;
// mUserDefinedSchema is the original input from the user, we don't suppose
// to store uid in mUserDefinedSchema
private List mUserDefinedSchema = null;
@@ -74,7 +75,9 @@ public class LOGenerate extends LogicalRelationalOperator {
outputPlanSchemas = new ArrayList();
expSchemas = new ArrayList();
+ flattenNumFields = new int[outputPlans.size()];
for(int i=0; i execJobs = pig.executeBatch();
+ for( ExecJob execJob : execJobs ) {
+ assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+ }
+ List actualResults = data.get("output");
+ List expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')",
+
+ //flatten(null-bag) on schema {(a1_1:int, a1_2:chararray)} expands to (null, null)
+ "('', '', '11', '12')", "('', '', '13', '14')",
+ "('k', 'l', '', '')", "('m', 'n', '', '')",
+
+ //flatten(null-tuple-from-bag) on schema {(a1_1:int, a1_2:chararray)} also expands to (null, null)
+ "('', '', '5', '6')", "('', '', '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')",
+ "('g', 'h', '9', '10')", "('g', 'h', '', '')", "('i', 'j', '9', '10')", "('i', 'j', '', '')" });
+
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testFlattenOnNullMap() throws Exception {
+ Storage.Data data = Storage.resetData(pig);
+ data.set("input",
+ Storage.tuple(
+ Storage.map("a","b",
+ "c","d"),
+ Storage.map("1","2",
+ "3","4")
+ )
+ ,
+ Storage.tuple(
+ null,
+ Storage.map("11","12",
+ "13","14")
+ ),
+ Storage.tuple(
+ Storage.map("k","l",
+ "m","n"),
+ null
+ )
+ );
+ pig.setBatchOn();
+ pig.registerQuery("A = load 'input' using mock.Storage() as (map1:map [chararray], map2:map [chararray]);");
+ pig.registerQuery("B = foreach A GENERATE FLATTEN(map1), FLATTEN(map2);");
+ pig.registerQuery("store B into 'output' using mock.Storage();");
+ List execJobs = pig.executeBatch();
+ for( ExecJob execJob : execJobs ) {
+ assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+ }
+ List actualResults = data.get("output");
+ List expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')",
+ // flatten(null-map) should expand to (null, null)
+ "('', '', '11', '12')", "('', '', '13', '14')", "('k', 'l', '', '')", "('m', 'n', '', '')"
+ });
+ Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testFlattenOnNullTuple() throws Exception {
+ Storage.Data data = Storage.resetData(pig);
+ data.set("input",
+ Storage.tuple(
+ Storage.tuple("a","b"),
+ Storage.tuple("1","2")
+ ),
+ Storage.tuple(
+ null,
+ Storage.tuple("3","4")
+ ),
+ Storage.tuple(
+ Storage.tuple("c","d"),
+ null
+ ),
+ Storage.tuple(
+ Storage.tuple("e", null),
+ Storage.tuple(null,"5")
+ )
+ );
+ pig.setBatchOn();
+ pig.registerQuery("A = load 'input' using mock.Storage() as (tuple1:tuple (a1:chararray, a2:chararray), tuple2:tuple (a3:chararray, a4:chararray));");
+ pig.registerQuery("B = foreach A GENERATE FLATTEN(tuple1), FLATTEN(tuple2);");
+ pig.registerQuery("store B into 'output' using mock.Storage();");
+ List execJobs = pig.executeBatch();
+ for( ExecJob execJob : execJobs ) {
+ assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+ }
+ List actualResults = data.get("output");
+ List expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('a', 'b', '1', '2')", "('', '', '3', '4')", "('c', 'd', '', '')", "('e', '', '', '5')" });
+
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testFlattenOnNullWithNoSchema() throws Exception {
+ Storage.Data data = Storage.resetData(pig);
+ data.set("input",
+ Storage.tuple(
+ null,
+ Storage.bag(
+ Storage.tuple("1","2"),
+ Storage.tuple("3","4"))
+ ),
+
+ Storage.tuple(
+ Storage.bag(
+ null,
+ Storage.tuple("e","f")),
+ Storage.bag(
+ Storage.tuple("5","6"),
+ Storage.tuple("7","8"))
+ ),
+
+ Storage.tuple(
+ null,
+ Storage.map("9","10")
+ ),
+
+ Storage.tuple(
+ Storage.tuple("g","h"),
+ null
+ ),
+
+ Storage.tuple(
+ Storage.tuple("13", null),
+ Storage.tuple(null,"16")
+ )
+ );
+ pig.setBatchOn();
+ pig.registerQuery("A = load 'input' using mock.Storage() as (a1, a2);");
+ pig.registerQuery("B = foreach A GENERATE FLATTEN(a1), FLATTEN(a2);");
+ pig.registerQuery("store B into 'output' using mock.Storage();");
+ List execJobs = pig.executeBatch();
+ for( ExecJob execJob : execJobs ) {
+ assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+ }
+ List actualResults = data.get("output");
+ List expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('', '1', '2')", "('', '3', '4')", //since no schema, flatten(null) ==> one null
+ "('', '5', '6')", "('', '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')",
+ "('', '9', '10')",
+ "('g', 'h', '')",
+ "('13', '', '', '16')"});
+
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testFlattenOnNullBagWithColumnPrune() throws Exception {
+ Storage.Data data = Storage.resetData(pig);
+ data.set("input",
+ Storage.tuple(
+ 1,
+ Storage.bag(
+ Storage.tuple("a","b"),
+ Storage.tuple("c","d")),
+ Storage.bag(
+ Storage.tuple("1","2"),
+ Storage.tuple("3","4"))
+ ),
+ Storage.tuple(
+ 2,
+ null,
+ Storage.bag(
+ Storage.tuple("11","12"),
+ Storage.tuple("13","14"))
+ ),
+ Storage.tuple(
+ 3,
+ Storage.bag(
+ Storage.tuple("k","l"),
+ Storage.tuple("m","n")),
+ null
+ ),
+ Storage.tuple(
+ 4,
+ Storage.bag(
+ null,
+ Storage.tuple("e","f")),
+ Storage.bag(
+ Storage.tuple("5","6"),
+ Storage.tuple("7","8"))
+ ),
+ Storage.tuple(
+ 5,
+ Storage.bag(
+ Storage.tuple("g","h"),
+ Storage.tuple("i","j")),
+ Storage.bag(
+ Storage.tuple("9","10"),
+ null
+ )
+ )
+ );
+ pig.setBatchOn();
+ pig.registerQuery("A = load 'input' using mock.Storage() as (a0:int, bag1:bag {(a1_1:int, a1_2:chararray)}, bag2:bag{(a2_1:chararray, a2_2:chararray)});");
+ pig.registerQuery("B = foreach A GENERATE FLATTEN(bag1), FLATTEN(bag2);");
+ pig.registerQuery("store B into 'output' using mock.Storage();");
+ List execJobs = pig.executeBatch();
+ for( ExecJob execJob : execJobs ) {
+ assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+ }
+ List actualResults = data.get("output");
+ List expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')",
+ "('', '', '11', '12')", "('', '', '13', '14')",
+ "('k', 'l', '', '')", "('m', 'n', '', '')",
+ "('', '', '5', '6')", "('', '', '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')",
+ "('g', 'h', '9', '10')", "('g', 'h', '', '')", "('i', 'j', '9', '10')", "('i', 'j', '', '')" });
+
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
}
diff --git test/org/apache/pig/test/TestTuple.java test/org/apache/pig/test/TestTuple.java
index 9dc32b433..680bdbf92 100644
--- test/org/apache/pig/test/TestTuple.java
+++ test/org/apache/pig/test/TestTuple.java
@@ -70,6 +70,9 @@ public class TestTuple {
assertEquals(
"(12,[pig#scalability],,12,1.2,(innerTuple),{(innerTuple)})",
TupleFormat.format(tuple));
+
+ //check if TupleFormat can handle null tuple
+ assertEquals("()", TupleFormat.format(null));
} catch (ExecException e) {
e.printStackTrace();
fail();