Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-4856 Optimization for pig on spark
  3. PIG-5054

Initialize SchemaTupleBackend correctly in backend in spark mode if spark job has more than 1 stage

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • spark-branch
    • spark
    • None

    Description

      After PIG-4970, we remove the serialization and deserialization of jobConf in spark mode. But in script of pigmix L5.pig

        register pigperf.jar;
      A = load '/user/pig/tests/data/pigmix/page_views' using org.apache.pig.test.udf.storefunc.PigPerformanceLoader()
          as (user, action, timespent, query_term, ip_addr, timestamp,
              estimated_revenue, page_info, page_links);
      B = foreach A generate user;
      alpha = load '/user/pig/tests/data/pigmix/users' using PigStorage('\u0001') as (name, phone, address,
              city, state, zip);
      beta = foreach alpha generate name;
      C = cogroup beta by name, B by user parallel 40;
      D = filter C by COUNT(beta) == 0;
      E = foreach D generate group;
      store E into 'L5out';
      

      following error is thrown out in log

      java.lang.RuntimeException: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error while executing ForEach at [C[-1,-1]]
           at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:89)
           at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96)
           at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
           at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
           at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
           at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:57)
           at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.hasNext(OutputConsumerIterator.java:96)
           at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
           at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
           at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1111)
           at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
           at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
           at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
           at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
           at org.apache.spark.scheduler.Task.run(Task.scala:89)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
           at java.lang.Thread.run(Thread.java:722)
      Caused by: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Error while executing ForEach at [C[-1,-1]]
           at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:329)
           at org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter$ForEachFunction$1$1.getNextResult(ForEachConverter.java:87)
           at org.apache.pig.backend.hadoop.executionengine.spark.converter.OutputConsumerIterator.readNext(OutputConsumerIterator.java:69)
           ... 20 more
      Caused by: java.lang.RuntimeException: initialize was not called! Even when SchemaTuple feature is not set, it should be called.
           at org.apache.pig.data.SchemaTupleBackend.newSchemaTupleFactory(SchemaTupleBackend.java:294)
           at org.apache.pig.data.SchemaTupleFactory.getInstance(SchemaTupleFactory.java:119)
           at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:350)
           at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:321)
           ... 22 more
      

      It seems that SchemaTupleBackend is not correctly initialized. The reason for this error is because after PIG-4970, we initialized SchemaTupleBackend in PigInputFormatSpark#initialize before we load data(stage-0). But it is not initialized in other stage(such as stage1). So if there are more than 1 stage, the exception will be thrown out.

      Attachments

        1. PIG-5054.0.patch
          5 kB
          Ádám Szita
        2. piglog
          64 kB
          Ádám Szita
        3. piglog2.txt
          90 kB
          Ádám Szita
        4. script.pig
          0.6 kB
          Ádám Szita

        Activity

          People

            szita Ádám Szita
            kellyzly liyunzhang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: