Description
The below is my script
register myudf.jar; c01 = LOAD 'input' USING org.test.MyTableLoader(''); c02 = FILTER c01 BY result == 'OK' AND formatted IS NOT NULL AND formatted != '' ; c03 = FOREACH c02 GENERATE url, formatted, FLATTEN(usage); c04 = FOREACH c03 GENERATE usage::domain AS domain, url, formatted; doc_001 = FOREACH c04 GENERATE domain,url, FLATTEN(MyExtractor(formatted)) AS category; doc_004_1 = GROUP doc_001 BY (domain,url); doc_005 = FOREACH doc_004_1 GENERATE group.domain as domain, group.url as url, doc_001.category as category; STORE doc_005 INTO 'out_final' USING PigStorage(); review1 = FOREACH c04 GENERATE domain,url, MyExtractor(formatted) AS rev; review2 = FILTER review1 BY SIZE(rev)>0; joinresult = JOIN review2 by (domain,url), doc_005 by (domain,url); finalresult = FOREACH joinresult GENERATE doc_005::category; STORE finalresult INTO 'out_final' using PigStorage();
The script is failing in building the plan, while applying for logical optimization rule for AddForEach.
ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2229: Couldn't find matching uid -1 for project (Name: Project Type: bytearray Uid: 106 Input: 0 Column: 5)
The problem is happening when I try to include doc_005::category in the projection for relation finalresult. This is field is orginated from the udf org.vivek.udfs.MyExtractor (source given below).
import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.*; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; public class MyExtractor extends EvalFunc<DataBag> { @Override public Schema outputSchema(Schema arg0) { try { return Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY); } catch (FrontendException e) { System.err.println("Error while generating schema. "+e); return new Schema(new FieldSchema(null, DataType.BAG)); } } @Override public DataBag exec(Tuple inputTuple) throws IOException { try { Tuple tp2 = TupleFactory.getInstance().newTuple(1); tp2.set(0, (inputTuple.get(0).toString()+inputTuple.hashCode())); DataBag retBag = BagFactory.getInstance().newDefaultBag(); retBag.add(tp2); return retBag; } catch (Exception e) { throw new IOException(" Caught exception", e); } } }
The script goes through fine if I disable AddForEach rule by -t AddForEach