Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • spark-branch
    • spark
    • None

    Description

      Related e2e-tests: ComputeSpec_[1-13]

      Attachments

        1. PIG-4209_1.patch
          20 kB
          liyunzhang
        2. PIG-4209.patch
          11 kB
          liyunzhang
        3. test_harnesss_1417075517
          42 kB
          Praveen Rachabattuni
        4. test_harnesss_1422610455
          2.24 MB
          Praveen Rachabattuni

        Issue Links

          Activity

            kellyzly liyunzhang added a comment -

            Use PIG-42325.patch to test, e2e-tests: StreamingLocal[1-18] all pass.

            kellyzly liyunzhang added a comment - Use PIG-4232 5.patch to test, e2e-tests: StreamingLocal [1-18] all pass.

            kellyzly All the tests are existing as skipped since only local mode was used in execonly key in streaming_local.conf.
            After adding spark mode there, I now have POStream operator not supported error. Please check attached log.

            praveenr019 Praveen Rachabattuni added a comment - kellyzly All the tests are existing as skipped since only local mode was used in execonly key in streaming_local.conf. After adding spark mode there, I now have POStream operator not supported error. Please check attached log.
            kellyzly liyunzhang added a comment -

            Thanks praveenr019 's feedback, will check and look into it

            kellyzly liyunzhang added a comment - Thanks praveenr019 's feedback, will check and look into it
            kellyzly liyunzhang added a comment -

            edit execonly key in streaming_local.conf
            before:
            'execonly' => 'local'
            after
            'execonly' => 'local,spark'

            after using the PIG-4209.patch.
            run e2e-tests: StreamingLocal_[1-18], all tests except StreamingLocal_5,9,11,13,14,16,17,18 pass.

            kellyzly liyunzhang added a comment - edit execonly key in streaming_local.conf before: 'execonly' => 'local' after 'execonly' => 'local,spark' after using the PIG-4209 .patch. run e2e-tests: StreamingLocal_ [1-18] , all tests except StreamingLocal_5,9,11,13,14,16,17,18 pass.
            kellyzly liyunzhang added a comment -

            In PIG-4209_1.patch, made following changes:
            add two new class StreamConverter.java and POStreamSpark.java
            StreamConverter contains StreamFunction which make POStream work in spark.
            POStreamSpark extends POStream, only overwrite POStream#getNextTuple. Add a parameter "proceed" in this function.
            This parameter decides whether current record is the end of input record. If "proceed" is "true", it is not the end otherwise "proceed" is "false"

              public Result getNextTuple(boolean proceed) throws ExecException {
                .....
              }
            

            POOutputConsumerIterator has some chanegs: not use recursive but use while in POOutputConsumerIterator#readNext. It will not influence the execution while adding a while loop because the condition to jump out of the while loop is same as the condition to jump out of function. The other function for use while not recursive is reducing the thread stack size.
            previous code

            private void readNext() {
                    try {
                        if (result != null && !returned) {
                            return;
                        }
                        // see PigGenericMapBase
                        if (result == null) {
                            if (!input.hasNext()) {
                                finished = true;
                                return;
                            }
                            Tuple v1 = input.next();
                            attach(v1);
                        }
                        result = getNextResult();
                        returned = false;
                        switch (result.returnStatus) {
                        case POStatus.STATUS_OK:
                            returned = false;
                            break;
                        case POStatus.STATUS_NULL:
                            returned = true; // skip: see PigGenericMapBase
                            readNext();
                            break;
                        case POStatus.STATUS_EOP:
                            finished = !input.hasNext();
                            if (!finished) {
                                result = null;
                                readNext();
                            }
                            break;
                        case POStatus.STATUS_ERR:
                            throw new RuntimeException("Error while processing " + result);
                        }
                    } catch (ExecException e) {
                        throw new RuntimeException(e);
                    }
                }
            

            code in PIG-4209_1.patch

            	 private void readNext() {
                    while (true) {
                        try {
                            if (result != null && !returned) {
                                return;
                            }
                            // see PigGenericMapBase
                            if (result == null) {
                                if (!input.hasNext()) {
                                    finished = true;
            
                                    return;
                                }
                                Tuple v1 = input.next();
                                attach(v1);
                            }
                            result = getNextResult();
                            returned = false;
                            switch (result.returnStatus) {
                                case POStatus.STATUS_OK:
                                    returned = false;
                                    break;
                                case POStatus.STATUS_NULL:
                                    returned = true; // skip: see PigGenericMapBase
                                    break;
                                case POStatus.STATUS_EOP:
                                    finished = !input.hasNext();
                                    if (!finished) {
                                        result = null;
                                    }
                                    break;
                                case POStatus.STATUS_ERR:
                                    throw new RuntimeException("Error while processing " + result);
                            }
                        } catch (ExecException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            
            kellyzly liyunzhang added a comment - In PIG-4209 _1.patch, made following changes: add two new class StreamConverter.java and POStreamSpark.java StreamConverter contains StreamFunction which make POStream work in spark. POStreamSpark extends POStream, only overwrite POStream#getNextTuple. Add a parameter "proceed" in this function. This parameter decides whether current record is the end of input record. If "proceed" is "true", it is not the end otherwise "proceed" is "false" public Result getNextTuple( boolean proceed) throws ExecException { ..... } POOutputConsumerIterator has some chanegs: not use recursive but use while in POOutputConsumerIterator#readNext. It will not influence the execution while adding a while loop because the condition to jump out of the while loop is same as the condition to jump out of function. The other function for use while not recursive is reducing the thread stack size. previous code private void readNext() { try { if (result != null && !returned) { return ; } // see PigGenericMapBase if (result == null ) { if (!input.hasNext()) { finished = true ; return ; } Tuple v1 = input.next(); attach(v1); } result = getNextResult(); returned = false ; switch (result.returnStatus) { case POStatus.STATUS_OK: returned = false ; break ; case POStatus.STATUS_NULL: returned = true ; // skip: see PigGenericMapBase readNext(); break ; case POStatus.STATUS_EOP: finished = !input.hasNext(); if (!finished) { result = null ; readNext(); } break ; case POStatus.STATUS_ERR: throw new RuntimeException( "Error while processing " + result); } } catch (ExecException e) { throw new RuntimeException(e); } } code in PIG-4209 _1.patch private void readNext() { while ( true ) { try { if (result != null && !returned) { return ; } // see PigGenericMapBase if (result == null ) { if (!input.hasNext()) { finished = true ; return ; } Tuple v1 = input.next(); attach(v1); } result = getNextResult(); returned = false ; switch (result.returnStatus) { case POStatus.STATUS_OK: returned = false ; break ; case POStatus.STATUS_NULL: returned = true ; // skip: see PigGenericMapBase break ; case POStatus.STATUS_EOP: finished = !input.hasNext(); if (!finished) { result = null ; } break ; case POStatus.STATUS_ERR: throw new RuntimeException( "Error while processing " + result); } } catch (ExecException e) { throw new RuntimeException(e); } } }

            Got all ComputeSpec tests passing in Spark local mode, also attached the log file.

            ant -Dhadoopversion=23 -Dharness.cluster.conf=$HADOOP_HOME/conf -Dharness.cluster.bin=$HADOOP_HOME/bin -Dharness.hadoop.home=$HADOOP_HOME -Dtests.to.run="-t ComputeSpec" test-e2e-spark
                  ......
                 [exec] Final results ,    PASSED: 13   FAILED: 0    SKIPPED: 0    ABORTED: 0    FAILED DEPENDENCY: 0
            

            Resolving this issue. Thanks kellyzly

            praveenr019 Praveen Rachabattuni added a comment - Got all ComputeSpec tests passing in Spark local mode, also attached the log file. ant -Dhadoopversion=23 -Dharness.cluster.conf=$HADOOP_HOME/conf -Dharness.cluster.bin=$HADOOP_HOME/bin -Dharness.hadoop.home=$HADOOP_HOME -Dtests.to.run= "-t ComputeSpec" test-e2e-spark ...... [exec] Final results , PASSED: 13 FAILED: 0 SKIPPED: 0 ABORTED: 0 FAILED DEPENDENCY: 0 Resolving this issue. Thanks kellyzly

            People

              kellyzly liyunzhang
              praveenr019 Praveen Rachabattuni
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: