diff --git src/docs/src/documentation/content/xdocs/basic.xml src/docs/src/documentation/content/xdocs/basic.xml index 9892ff5e3..d12593533 100644 --- src/docs/src/documentation/content/xdocs/basic.xml +++ src/docs/src/documentation/content/xdocs/basic.xml @@ -1225,6 +1225,20 @@ dump X; (joe,18,2.5,joe,18,2.5) +
+ Nulls and FLATTEN Operator +

The FLATTEN operator handles null value differently based on its schema.

+

For null tuples, FLATTEN(null) produces multiples nulls based on the number of elements in the schema for that field. + If tuple has no schema, FLATTEN(null) simply returns a single null.

+

For null bags, we would have liked to discard the row just like we do with flatten of an empty bag. + However, it was too late by the time we noticed this inconsistency. + In order to preserve the backward compatibility, FLATTEN(null) for bag produces multiples nulls + based on the number of elements defined for the schema of this bag. + If no schema, a single null is returned.

+

For bags containing some null Tuples, it follows the same rule as flatten of null tuples described above.

+

For null maps, FLATTEN(null) produces 2 nulls to represent the key and the value.

+

For null with other types, FLATTEN(null) simply returns a single null.

+
@@ -5453,8 +5467,11 @@ D = foreach C generate A::y, z; -- Cannot simply refer to y as it can refer to A we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.

+

For other types, flatten becames a no-op and simply returns the passed value.

+

Also note that the flatten of empty bag will result in that row being discarded; no output is generated. (See also Drop Nulls Before a Join.)

+

As for flatten with null values, see Nulls and Flatten operataor.

grunt> cat empty.bag diff --git src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java index aba51319c..3b6733f28 100644 --- src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java +++ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java @@ -74,6 +74,8 @@ public class POForEach extends PhysicalOperator { // so we can also save on the Boolean.booleanValue() calls protected boolean[] isToBeFlattenedArray; + private int[] flattenNumFields = null; + protected int noItems; //Since the plan has a generate, this needs to be maintained @@ -446,6 +448,8 @@ public class POForEach extends PhysicalOperator { } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) { its[i] = ((Map)bags[i]).entrySet().iterator(); } else { + // This includes FLATTEN(null) for bag and map + // in addition to non-null values from other data types its[i] = null; } } @@ -490,7 +494,6 @@ public class POForEach extends PhysicalOperator { if(getReporter()!=null) { getReporter().progress(); } - //createTuple(data); res.result = createTuple(data); res.returnStatus = POStatus.STATUS_OK; return res; @@ -545,8 +548,8 @@ public class POForEach extends PhysicalOperator { if (knownSize) { out.set(idx++, t.get(j)); } else { - out.append(t.get(j)); - } + out.append(t.get(j)); + } } } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) { Map.Entry entry = (Map.Entry)in; @@ -557,14 +560,25 @@ public class POForEach extends PhysicalOperator { out.append(entry.getKey()); out.append(entry.getValue()); } + } else if (isToBeFlattenedArray[i] && in == null && + flattenNumFields != null && flattenNumFields[i] != 0 ) { + // Handling of FLATTEN(null) here. + // Expanding to multiple nulls depending on the schema + for( int j = 0; j < flattenNumFields[i]; j++ ) { + if (knownSize) { + out.set(idx++, null); + } else { + out.append(null); + } + } } else { if (knownSize) { out.set(idx++, in); - } else { - out.append(in); + } else { + out.append(in); + } } } - } if (inpTuple != null) { return illustratorMarkup(inpTuple, out, 0); } else { @@ -706,6 +720,7 @@ public class POForEach extends PhysicalOperator { clone.addOriginalLocation(alias, getOriginalLocations()); clone.endOfAllInputProcessing = endOfAllInputProcessing; clone.mapSideOnly = mapSideOnly; + clone.flattenNumFields = flattenNumFields; return clone; } @@ -714,6 +729,10 @@ public class POForEach extends PhysicalOperator { return processingPlan; } + public void setFlattenNumFields (int [] flattenNumFields) { + this.flattenNumFields = flattenNumFields; + } + protected void setUpFlattens(List isToBeFlattened) { if(isToBeFlattened == null) { isToBeFlattenedArray = null; diff --git src/org/apache/pig/impl/util/TupleFormat.java src/org/apache/pig/impl/util/TupleFormat.java index ad76b52e6..1a0066369 100644 --- src/org/apache/pig/impl/util/TupleFormat.java +++ src/org/apache/pig/impl/util/TupleFormat.java @@ -47,32 +47,34 @@ public class TupleFormat { public static String format(Tuple tuple) { StringBuilder sb = new StringBuilder(); sb.append('('); - for (int i = 0; i < tuple.size(); ++i) { - try { - Object d = tuple.get(i); - if (d != null) { - if (d instanceof Map) { - sb.append(DataType.mapToString((Map) d)); - } else if (d instanceof Tuple) { - Tuple t = (Tuple) d; - sb.append(TupleFormat.format(t)); - } else if (d instanceof DataBag){ - DataBag bag=(DataBag)d; - sb.append(BagFormat.format(bag)); + if( tuple != null ) { + for (int i = 0; i < tuple.size(); ++i) { + try { + Object d = tuple.get(i); + if (d != null) { + if (d instanceof Map) { + sb.append(DataType.mapToString((Map) d)); + } else if (d instanceof Tuple) { + Tuple t = (Tuple) d; + sb.append(TupleFormat.format(t)); + } else if (d instanceof DataBag){ + DataBag bag=(DataBag)d; + sb.append(BagFormat.format(bag)); + } + else { + sb.append(d.toString()); + } + } else { + sb.append(""); } - else { - sb.append(d.toString()); - } - } else { - sb.append(""); + if (i != tuple.size() - 1) + sb.append(","); + } catch (ExecException e) { + e.printStackTrace(); + mLog.warn("Exception when format tuple", e); } - if (i != tuple.size() - 1) - sb.append(","); - } catch (ExecException e) { - e.printStackTrace(); - mLog.warn("Exception when format tuple", e); - } + } } sb.append(')'); return sb.toString(); diff --git src/org/apache/pig/newplan/logical/relational/LOGenerate.java src/org/apache/pig/newplan/logical/relational/LOGenerate.java index 2609c2dfe..13f5b387b 100644 --- src/org/apache/pig/newplan/logical/relational/LOGenerate.java +++ src/org/apache/pig/newplan/logical/relational/LOGenerate.java @@ -33,6 +33,7 @@ import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchem public class LOGenerate extends LogicalRelationalOperator { private List outputPlans; private boolean[] flattenFlags; + private int[] flattenNumFields; // mUserDefinedSchema is the original input from the user, we don't suppose // to store uid in mUserDefinedSchema private List mUserDefinedSchema = null; @@ -74,7 +75,9 @@ public class LOGenerate extends LogicalRelationalOperator { outputPlanSchemas = new ArrayList(); expSchemas = new ArrayList(); + flattenNumFields = new int[outputPlans.size()]; for(int i=0; i execJobs = pig.executeBatch(); + for( ExecJob execJob : execJobs ) { + assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED ); + } + List actualResults = data.get("output"); + List expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')", + + //flatten(null-bag) on schema {(a1_1:int, a1_2:chararray)} expands to (null, null) + "('', '', '11', '12')", "('', '', '13', '14')", + "('k', 'l', '', '')", "('m', 'n', '', '')", + + //flatten(null-tuple-from-bag) on schema {(a1_1:int, a1_2:chararray)} also expands to (null, null) + "('', '', '5', '6')", "('', '', '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')", + "('g', 'h', '9', '10')", "('g', 'h', '', '')", "('i', 'j', '9', '10')", "('i', 'j', '', '')" }); + + Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + } + + @Test + public void testFlattenOnNullMap() throws Exception { + Storage.Data data = Storage.resetData(pig); + data.set("input", + Storage.tuple( + Storage.map("a","b", + "c","d"), + Storage.map("1","2", + "3","4") + ) + , + Storage.tuple( + null, + Storage.map("11","12", + "13","14") + ), + Storage.tuple( + Storage.map("k","l", + "m","n"), + null + ) + ); + pig.setBatchOn(); + pig.registerQuery("A = load 'input' using mock.Storage() as (map1:map [chararray], map2:map [chararray]);"); + pig.registerQuery("B = foreach A GENERATE FLATTEN(map1), FLATTEN(map2);"); + pig.registerQuery("store B into 'output' using mock.Storage();"); + List execJobs = pig.executeBatch(); + for( ExecJob execJob : execJobs ) { + assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED ); + } + List actualResults = data.get("output"); + List expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')", + // flatten(null-map) should expand to (null, null) + "('', '', '11', '12')", "('', '', '13', '14')", "('k', 'l', '', '')", "('m', 'n', '', '')" + }); + Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); + } + + @Test + public void testFlattenOnNullTuple() throws Exception { + Storage.Data data = Storage.resetData(pig); + data.set("input", + Storage.tuple( + Storage.tuple("a","b"), + Storage.tuple("1","2") + ), + Storage.tuple( + null, + Storage.tuple("3","4") + ), + Storage.tuple( + Storage.tuple("c","d"), + null + ), + Storage.tuple( + Storage.tuple("e", null), + Storage.tuple(null,"5") + ) + ); + pig.setBatchOn(); + pig.registerQuery("A = load 'input' using mock.Storage() as (tuple1:tuple (a1:chararray, a2:chararray), tuple2:tuple (a3:chararray, a4:chararray));"); + pig.registerQuery("B = foreach A GENERATE FLATTEN(tuple1), FLATTEN(tuple2);"); + pig.registerQuery("store B into 'output' using mock.Storage();"); + List execJobs = pig.executeBatch(); + for( ExecJob execJob : execJobs ) { + assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED ); + } + List actualResults = data.get("output"); + List expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "('a', 'b', '1', '2')", "('', '', '3', '4')", "('c', 'd', '', '')", "('e', '', '', '5')" }); + + Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + } + + @Test + public void testFlattenOnNullWithNoSchema() throws Exception { + Storage.Data data = Storage.resetData(pig); + data.set("input", + Storage.tuple( + null, + Storage.bag( + Storage.tuple("1","2"), + Storage.tuple("3","4")) + ), + + Storage.tuple( + Storage.bag( + null, + Storage.tuple("e","f")), + Storage.bag( + Storage.tuple("5","6"), + Storage.tuple("7","8")) + ), + + Storage.tuple( + null, + Storage.map("9","10") + ), + + Storage.tuple( + Storage.tuple("g","h"), + null + ), + + Storage.tuple( + Storage.tuple("13", null), + Storage.tuple(null,"16") + ) + ); + pig.setBatchOn(); + pig.registerQuery("A = load 'input' using mock.Storage() as (a1, a2);"); + pig.registerQuery("B = foreach A GENERATE FLATTEN(a1), FLATTEN(a2);"); + pig.registerQuery("store B into 'output' using mock.Storage();"); + List execJobs = pig.executeBatch(); + for( ExecJob execJob : execJobs ) { + assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED ); + } + List actualResults = data.get("output"); + List expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "('', '1', '2')", "('', '3', '4')", //since no schema, flatten(null) ==> one null + "('', '5', '6')", "('', '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')", + "('', '9', '10')", + "('g', 'h', '')", + "('13', '', '', '16')"}); + + Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + } + + @Test + public void testFlattenOnNullBagWithColumnPrune() throws Exception { + Storage.Data data = Storage.resetData(pig); + data.set("input", + Storage.tuple( + 1, + Storage.bag( + Storage.tuple("a","b"), + Storage.tuple("c","d")), + Storage.bag( + Storage.tuple("1","2"), + Storage.tuple("3","4")) + ), + Storage.tuple( + 2, + null, + Storage.bag( + Storage.tuple("11","12"), + Storage.tuple("13","14")) + ), + Storage.tuple( + 3, + Storage.bag( + Storage.tuple("k","l"), + Storage.tuple("m","n")), + null + ), + Storage.tuple( + 4, + Storage.bag( + null, + Storage.tuple("e","f")), + Storage.bag( + Storage.tuple("5","6"), + Storage.tuple("7","8")) + ), + Storage.tuple( + 5, + Storage.bag( + Storage.tuple("g","h"), + Storage.tuple("i","j")), + Storage.bag( + Storage.tuple("9","10"), + null + ) + ) + ); + pig.setBatchOn(); + pig.registerQuery("A = load 'input' using mock.Storage() as (a0:int, bag1:bag {(a1_1:int, a1_2:chararray)}, bag2:bag{(a2_1:chararray, a2_2:chararray)});"); + pig.registerQuery("B = foreach A GENERATE FLATTEN(bag1), FLATTEN(bag2);"); + pig.registerQuery("store B into 'output' using mock.Storage();"); + List execJobs = pig.executeBatch(); + for( ExecJob execJob : execJobs ) { + assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED ); + } + List actualResults = data.get("output"); + List expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')", + "('', '', '11', '12')", "('', '', '13', '14')", + "('k', 'l', '', '')", "('m', 'n', '', '')", + "('', '', '5', '6')", "('', '', '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')", + "('g', 'h', '9', '10')", "('g', 'h', '', '')", "('i', 'j', '9', '10')", "('i', 'j', '', '')" }); + + Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + } } diff --git test/org/apache/pig/test/TestTuple.java test/org/apache/pig/test/TestTuple.java index 9dc32b433..680bdbf92 100644 --- test/org/apache/pig/test/TestTuple.java +++ test/org/apache/pig/test/TestTuple.java @@ -70,6 +70,9 @@ public class TestTuple { assertEquals( "(12,[pig#scalability],,12,1.2,(innerTuple),{(innerTuple)})", TupleFormat.format(tuple)); + + //check if TupleFormat can handle null tuple + assertEquals("()", TupleFormat.format(null)); } catch (ExecException e) { e.printStackTrace(); fail(); diff --git test/org/apache/pig/test/Util.java test/org/apache/pig/test/Util.java index 8d4282dfb..f2cec4529 100644 --- test/org/apache/pig/test/Util.java +++ test/org/apache/pig/test/Util.java @@ -613,8 +613,7 @@ public class Util { Collections.sort(actualResList); Collections.sort(expectedResList); - Assert.assertEquals("Comparing actual and expected results. ", - expectedResList, actualResList); + checkQueryOutputs(actualResList.iterator(), expectedResList); }