Avro
  1. Avro
  2. AVRO-570

python implementation of mapreduce connector

    Details

    • Type: New Feature New Feature
    • Status: Patch Available
    • Priority: Critical Critical
    • Resolution: Unresolved
    • Affects Version/s: 1.6.0
    • Fix Version/s: None
    • Component/s: python
    • Labels:
    • Tags:
      hadoop, mapreduce, tethered, python

      Description

      AVRO-512 defines protocols for implementing mapreduce tasks. It would be good to have a Python implementation of this.

      1. AVRO-570.patch
        105 kB
        Jeremy Lewi
      2. AVRO-570.patch
        105 kB
        Jeremy Lewi
      3. AVRO-570.patch
        108 kB
        Jeremy Lewi
      4. AVRO-570.patch
        106 kB
        Jeremy Lewi
      5. AVRO-570.patch
        114 kB
        Jeremy Lewi
      6. AVRO-570.patch
        114 kB
        Jeremy Lewi
      7. AVRO-570.patch
        120 kB
        Jeremy Lewi

        Issue Links

          Activity

          Hide
          Jeremy Lewi added a comment -

          I should also mention that I think this warning

          /home/jlewi/tmp/trunk/lang/py/build/src/avro/tether/tether_task_runner.py:24: RuntimeWarning: Parent module 'avro.tether' not found while handling absolute import
            from avro import tether
          

          is misleading. I'm seeing it as well when I run the unittests and my unittests are passing.

          Show
          Jeremy Lewi added a comment - I should also mention that I think this warning /home/jlewi/tmp/trunk/lang/py/build/src/avro/tether/tether_task_runner.py:24: RuntimeWarning: Parent module 'avro.tether' not found while handling absolute import from avro import tether is misleading. I'm seeing it as well when I run the unittests and my unittests are passing.
          Hide
          Jeremy Lewi added a comment -

          Doug, I'm a little confused by the stack trace that's being printed out. The stack trace shows that "test_tether_word_count.py" is throwing the exception. However, the text (e.g)

          {"name": "ReferencedRecord", "type": "record", 
          

          looks like schemas that I think are being used in a different test, "test_protocol.py" which shouldn't have anything to do with the mapreduce API. (Is it possible ant runs the tests in different threads and jumbles the stdout from the different tests?).

          My suggestion would be to run the individual tests manually as follows. You'll need to grab my latest patch as I fixed a couple issues that would prevent this.
          Build the python files and java jars

          cd SVN_AVRO/lang/java
          mvn package -DskipTests
          cd SVN_AVRO/lang/py/
          ant dist
          

          Set your python path

          export PYTHONPATH=SVN_AVRO/lang/py/build/src:SVN_AVRO/lang/py/build/test
          

          Run the individual tests

          cd SVN_AVRO/lang/py
          python build/test/test_tether_task.py 
          python build/test/test_tether_task_runner.py 
          python build/test/test_tether_word_count.py 
          

          Please send me the complete output (not just stack traces) of the tests that fail as that will help me troubleshoot the problem.

          Thanks for your patience and help.

          Show
          Jeremy Lewi added a comment - Doug, I'm a little confused by the stack trace that's being printed out. The stack trace shows that "test_tether_word_count.py" is throwing the exception. However, the text (e.g) {"name": "ReferencedRecord", "type": "record", looks like schemas that I think are being used in a different test, "test_protocol.py" which shouldn't have anything to do with the mapreduce API. (Is it possible ant runs the tests in different threads and jumbles the stdout from the different tests?). My suggestion would be to run the individual tests manually as follows. You'll need to grab my latest patch as I fixed a couple issues that would prevent this. Build the python files and java jars cd SVN_AVRO/lang/java mvn package -DskipTests cd SVN_AVRO/lang/py/ ant dist Set your python path export PYTHONPATH=SVN_AVRO/lang/py/build/src:SVN_AVRO/lang/py/build/test Run the individual tests cd SVN_AVRO/lang/py python build/test/test_tether_task.py python build/test/test_tether_task_runner.py python build/test/test_tether_word_count.py Please send me the complete output (not just stack traces) of the tests that fail as that will help me troubleshoot the problem. Thanks for your patience and help.
          Hide
          Jeremy Lewi added a comment -

          Here's an updated patch. It doesn't fix the problems Doug's having. It does, however, fix typos that would prevent the python unittests from being run manually.

          Show
          Jeremy Lewi added a comment - Here's an updated patch. It doesn't fix the problems Doug's having. It does, however, fix typos that would prevent the python unittests from being run manually.
          Hide
          Doug Cutting added a comment -

          I don't seem to have avro installed in /usr/lib/python. The tests you describe above find the version in my build directory, as expected. The scripts in /tmp look fine. Yet I still see:

            [py-test] ./home/cutting/src/avro/trunk/lang/py/build/src/avro/tether/tether_task_runner.py:24: RuntimeWarning: Parent module 'avro.tether' not found while handling absolute import
            [py-test]   from avro import tether
          
          Show
          Doug Cutting added a comment - I don't seem to have avro installed in /usr/lib/python. The tests you describe above find the version in my build directory, as expected. The scripts in /tmp look fine. Yet I still see: [py-test] ./home/cutting/src/avro/trunk/lang/py/build/src/avro/tether/tether_task_runner.py:24: RuntimeWarning: Parent module 'avro.tether' not found while handling absolute import [py-test] from avro import tether
          Hide
          Jeremy Lewi added a comment -

          Doug,

          Sorry for not responding sooner (gmail failed to flag the email as important and it got lost in the deluge).

          It looks like the python path isn't getting set correctly in the tethered task and it can't find the tether module. My guess is that you have previously installed the python avro module and this is what is getting found when you do

          from avro import tether
          

          i.e something similar to https://issues.apache.org/jira/browse/AVRO-849).

          Can you try removing any previously installed avro modules? e.g

          rm -rf /usr/lib/python2.7/site-packages/avro-*
          

          of course you'll want to substitute the correct python path.

          One other thing to look at is the output of the python tests. Look for a line like the following

           [py-test] Command:
           [py-test] 	hadoop-0.20 jar /home/jlewi/svn_avro/lang/java/tools/target/avro-tools-1.6.1-job.jar tether --in /tmp/mapred/in --out /tmp/mapred/out --outschema /tmp/wordcounterN9SD.avsc --protocol http --program /tmp/exec_word_count_9vUzRg
          

          The file

          /tmp/exec_word_count_??????
          

          is a temporary file that gets written on each invocation of the tests so the suffix will change on each invocation. This file is a bash script that gets executed to start the tethered process.

          The contents of the file should be something like

          #!/bin/bash
          export PYTHONPATH=/home/jlewi/svn_avro/lang/py/build/src:/home/jlewi/svn_avro/lang/py/build/test
          python -m avro.tether.tether_task_runner word_count_task.WordCountTask
          

          Can you take a look at your file and verify that the python path is set correctly in your case, i.e you could try executing the following in a shell

          export PYTHONPATH=/home/jlewi/svn_avro/lang/py/build/src:/home/jlewi/svn_avro/lang/py/build/test
          

          then start an interactive python session and executing

          from avro import tether
          

          If you get an exception then there's a problem importing avro because 1) either the path isn't set correctly or 2) there's an older version of avro with higher precedence on the path. If its the latter you can try the following commands to identify which avro its picking up

          import avro
          avro.__file__
          

          J

          Show
          Jeremy Lewi added a comment - Doug, Sorry for not responding sooner (gmail failed to flag the email as important and it got lost in the deluge). It looks like the python path isn't getting set correctly in the tethered task and it can't find the tether module. My guess is that you have previously installed the python avro module and this is what is getting found when you do from avro import tether i.e something similar to https://issues.apache.org/jira/browse/AVRO-849 ). Can you try removing any previously installed avro modules? e.g rm -rf /usr/lib/python2.7/site-packages/avro-* of course you'll want to substitute the correct python path. One other thing to look at is the output of the python tests. Look for a line like the following [py-test] Command: [py-test] hadoop-0.20 jar /home/jlewi/svn_avro/lang/java/tools/target/avro-tools-1.6.1-job.jar tether --in /tmp/mapred/in --out /tmp/mapred/out --outschema /tmp/wordcounterN9SD.avsc --protocol http --program /tmp/exec_word_count_9vUzRg The file /tmp/exec_word_count_?????? is a temporary file that gets written on each invocation of the tests so the suffix will change on each invocation. This file is a bash script that gets executed to start the tethered process. The contents of the file should be something like #!/bin/bash export PYTHONPATH=/home/jlewi/svn_avro/lang/py/build/src:/home/jlewi/svn_avro/lang/py/build/test python -m avro.tether.tether_task_runner word_count_task.WordCountTask Can you take a look at your file and verify that the python path is set correctly in your case, i.e you could try executing the following in a shell export PYTHONPATH=/home/jlewi/svn_avro/lang/py/build/src:/home/jlewi/svn_avro/lang/py/build/test then start an interactive python session and executing from avro import tether If you get an exception then there's a problem importing avro because 1) either the path isn't set correctly or 2) there's an older version of avro with higher precedence on the path. If its the latter you can try the following commands to identify which avro its picking up import avro avro.__file__ J
          Hide
          Doug Cutting added a comment -

          Finally looking at this. The Java changes look reasonable and all Java tests pass for me.

          Python tests fail with:

          
          [py-test] ./home/cutting/src/avro/trunk/lang/py/build/src/avro/tether/tether_task_runner.py:24: RuntimeWarning: Parent module 'avro.tether' not found while handling absolute import
            [py-test]   from avro import tether
            [py-test] INFO:root:tether_task_runner.__main__: Task: word_count_task.WordCountTask
            [py-test] INFO:TetherTask:TetherTask.open: Opening connection to parent server on port=42343
            [py-test] MockParentResponder: Recieved 'configure': inputPort=59800
            [py-test] localhost.localdomain - - [23/Nov/2011 15:15:17] "POST / HTTP/1.1" 200 -
            [py-test] .E
            [py-test] ======================================================================
            [py-test] space", 
            [py-test] ERROR: test1 (test_tether_word_count.TestTetherWordCount)
            [py-test]        "fields": [ {"name": "foo", "type": "string"} ] },
            [py-test] ----------------------------------------------------------------------
            [py-test]      {"name": "ReferencedRecord", "type": "record", 
            [py-test] Traceback (most recent call last):
            [py-test]        "fields": [ {"name": "bar", "type": "double"} ] },
            [py-test]   File "/home/cutting/src/avro/trunk/lang/py/build/test/test_tether_word_count.py", line 187, in test1
            [py-test]      {"name": "TestError",
            [py-test]     proc=subprocess.Popen(args)
            [py-test]       "type": "error", "fields": [ {"name": "message", "type": "string"} ]
            [py-test]   File "/usr/lib/python2.6/subprocess.py", line 623, in __init__
            [py-test]      }
            [py-test]     errread, errwrite)
            [py-test]  ],
            [py-test]   File "/usr/lib/python2.6/subprocess.py", line 1141, in _execute_child
            [py-test] 
            [py-test]     raise child_exception
            [py-test]  "messages": {
            [py-test] OSError: [Errno 2] No such file or directory
            [py-test]      "echo": {
            [py-test] 
            [py-test]          "request": [{"name": "qualified", 
            [py-test] ----------------------------------------------------------------------
            [py-test]              "type": "ReferencedRecord"}],
            [py-test] Ran 45 tests in 8.061s
          

          Any idea what's causing that?

          Show
          Doug Cutting added a comment - Finally looking at this. The Java changes look reasonable and all Java tests pass for me. Python tests fail with: [py-test] ./home/cutting/src/avro/trunk/lang/py/build/src/avro/tether/tether_task_runner.py:24: RuntimeWarning: Parent module 'avro.tether' not found while handling absolute import [py-test] from avro import tether [py-test] INFO:root:tether_task_runner.__main__: Task: word_count_task.WordCountTask [py-test] INFO:TetherTask:TetherTask.open: Opening connection to parent server on port=42343 [py-test] MockParentResponder: Recieved 'configure': inputPort=59800 [py-test] localhost.localdomain - - [23/Nov/2011 15:15:17] "POST / HTTP/1.1" 200 - [py-test] .E [py-test] ====================================================================== [py-test] space", [py-test] ERROR: test1 (test_tether_word_count.TestTetherWordCount) [py-test] "fields" : [ { "name" : "foo" , "type" : "string" } ] }, [py-test] ---------------------------------------------------------------------- [py-test] { "name" : "ReferencedRecord" , "type" : "record" , [py-test] Traceback (most recent call last): [py-test] "fields" : [ { "name" : "bar" , "type" : " double " } ] }, [py-test] File "/home/cutting/src/avro/trunk/lang/py/build/test/test_tether_word_count.py" , line 187, in test1 [py-test] { "name" : "TestError" , [py-test] proc=subprocess.Popen(args) [py-test] "type" : "error" , "fields" : [ { "name" : "message" , "type" : "string" } ] [py-test] File "/usr/lib/python2.6/subprocess.py" , line 623, in __init__ [py-test] } [py-test] errread, errwrite) [py-test] ], [py-test] File "/usr/lib/python2.6/subprocess.py" , line 1141, in _execute_child [py-test] [py-test] raise child_exception [py-test] "messages" : { [py-test] OSError: [Errno 2] No such file or directory [py-test] "echo" : { [py-test] [py-test] "request" : [{ "name" : "qualified" , [py-test] ---------------------------------------------------------------------- [py-test] "type" : "ReferencedRecord" }], [py-test] Ran 45 tests in 8.061s Any idea what's causing that?
          Hide
          Jeremy Lewi added a comment -

          Here's an updated patch ready for review.

          Show
          Jeremy Lewi added a comment - Here's an updated patch ready for review.
          Hide
          Jeremy Lewi added a comment -

          Found the bug. Should have a patch ready for review tonight.

          Show
          Jeremy Lewi added a comment - Found the bug. Should have a patch ready for review tonight.
          Hide
          Jeremy Lewi added a comment -

          This an updated patch that resolves subversion conflicts. Unfortunately, not all unittests are passing yet.

          TestWordCountTether fails when using the HTTP Protocol. The problem appears to be the following. After the mapper finishes, TetherMapRunner sends a "complete" message to the tethered process and the tethered process sends back a "complete" message and then exits. Based on the log messages, the messages are getting sent and received. However, TetherMapRunner's sending of "complete" results in a connection refused exception. This appears to only happen with the HTTP Protocol and not the SASL protocol.

          The python code which is also using the HTTP protocol works just fine.

          Any help or ideas would be appreciated.

          Show
          Jeremy Lewi added a comment - This an updated patch that resolves subversion conflicts. Unfortunately, not all unittests are passing yet. TestWordCountTether fails when using the HTTP Protocol. The problem appears to be the following. After the mapper finishes, TetherMapRunner sends a "complete" message to the tethered process and the tethered process sends back a "complete" message and then exits. Based on the log messages, the messages are getting sent and received. However, TetherMapRunner's sending of "complete" results in a connection refused exception. This appears to only happen with the HTTP Protocol and not the SASL protocol. The python code which is also using the HTTP protocol works just fine. Any help or ideas would be appreciated.
          Hide
          Jeremy Lewi added a comment -

          I'll try to get to it next week or the week after so that it can hopefully go into 1.6.1 assuming no incompatibility changes.

          Show
          Jeremy Lewi added a comment - I'll try to get to it next week or the week after so that it can hopefully go into 1.6.1 assuming no incompatibility changes.
          Hide
          Doug Cutting added a comment -

          I'm hoping to release 1.6.0 in the next week or two. As long as this doesn't make any incompatible changes it could go into 1.6.1 which will likely follow in a month or so. If it makes incompatible changes and doesn't make 1.6.0 then it wouldn't go out until 1.7.0, probably sometime in the first half of 2012.

          Show
          Doug Cutting added a comment - I'm hoping to release 1.6.0 in the next week or two. As long as this doesn't make any incompatible changes it could go into 1.6.1 which will likely follow in a month or so. If it makes incompatible changes and doesn't make 1.6.0 then it wouldn't go out until 1.7.0, probably sometime in the first half of 2012.
          Hide
          Jeremy Lewi added a comment -

          I'll be happy to update the patch. What kind of timeline do you have in mind?

          Show
          Jeremy Lewi added a comment - I'll be happy to update the patch. What kind of timeline do you have in mind?
          Hide
          Doug Cutting added a comment -

          > SASL isn't supported in python and python is the only language other than java supporting tethered jobs so SASL probably isn't a good choice either.

          Implementing SASL's anonymous mechanism on a TCP socket is very simple.

          http://avro.apache.org/docs/current/sasl.html#anonymous

          But, in the meantime we can use HTTP to get the first version of this debugged & committed.

          This patch has unfortunately gone stale since I last had a chance to inspect it. Sorry! If you have a chance, can you please update it? I'd still really like to see this functionality in an Avro release soon.

          Show
          Doug Cutting added a comment - > SASL isn't supported in python and python is the only language other than java supporting tethered jobs so SASL probably isn't a good choice either. Implementing SASL's anonymous mechanism on a TCP socket is very simple. http://avro.apache.org/docs/current/sasl.html#anonymous But, in the meantime we can use HTTP to get the first version of this debugged & committed. This patch has unfortunately gone stale since I last had a chance to inspect it. Sorry! If you have a chance, can you please update it? I'd still really like to see this functionality in an Avro release soon.
          Hide
          Jeremy Lewi added a comment -

          Doug, I appreciate your careful review, and I've attempted to fix all the issues you pointed out in this patch.

          TRANSPROTO has been renamed to PROTOCOL and I addeded some javadoc. The code also uses "switch" instead of "if-else" blocks to process the protocol.

          I would argue that currently we need a NONE value for PROTOCOL because there is no intelligent default. From my limited understanding of protocols, http is a very inefficient protocol for tethered jobs. So HTTP probably isn't a good default. On the other hand, SASL isn't supported in python and python is the only language other than java supporting tethered jobs so SASL probably isn't a good choice either.

          I renamed the python test target "test-570" to "test-tether" so that we don't reference a jira. Is this acceptable?

          I tried to address all of the cosmetic issues Doug pointed out. It does look like SVN is doing some white space only changes but I think this might be because I changed some tabs to spaces.

          I'd also like to call the reviewer's attention to the fact that I've changed the hadoop version for the dependency in the POM. (See AVRO-852).

          Show
          Jeremy Lewi added a comment - Doug, I appreciate your careful review, and I've attempted to fix all the issues you pointed out in this patch. TRANSPROTO has been renamed to PROTOCOL and I addeded some javadoc. The code also uses "switch" instead of "if-else" blocks to process the protocol. I would argue that currently we need a NONE value for PROTOCOL because there is no intelligent default. From my limited understanding of protocols, http is a very inefficient protocol for tethered jobs. So HTTP probably isn't a good default. On the other hand, SASL isn't supported in python and python is the only language other than java supporting tethered jobs so SASL probably isn't a good choice either. I renamed the python test target "test-570" to "test-tether" so that we don't reference a jira. Is this acceptable? I tried to address all of the cosmetic issues Doug pointed out. It does look like SVN is doing some white space only changes but I think this might be because I changed some tabs to spaces. I'd also like to call the reviewer's attention to the fact that I've changed the hadoop version for the dependency in the POM. (See AVRO-852 ).
          Hide
          Jeremy Lewi added a comment -

          Thanks Doug for taking the time to review the patch. I will start incorporating your comments and hopefully in the meantime some python, maven experts will jump in with additional comments. Sorry about all the white space problems; I thought checkstyle would find all the "tabs".

          Show
          Jeremy Lewi added a comment - Thanks Doug for taking the time to review the patch. I will start incorporating your comments and hopefully in the meantime some python, maven experts will jump in with additional comments. Sorry about all the white space problems; I thought checkstyle would find all the "tabs".
          Hide
          Doug Cutting added a comment -

          Great to see this working!

          Some cosmetic comments on the patch:

          • please don't include comments with your name, the date, stack traces, etc.
          • please don't use tabs for indentation, but only spaces, 2 per indent level
          • please check your patch to make sure no whitespace-only changes are made
          • please put a space after // and before {

          These guidelines make it much easier to review patches.

          For the enum TRANSPROTO:

          • this might better be called simply TRANSPORT or PROTOCOL or maybe TRANSPORT_PROTOCOL.
          • if it's public in a public, non-test class then it needs javadoc
          • i'd prefer when values of this are processed, rather than using an if-else structure, a switch statement is used with a 'default' clause that throws an exception. this makes it clearer that all values are handled.
          • do we need a NONE value, or should we rather simply default the value to HTTP or SASL?

          The need for the flush in TetherKeySerialization is mysterious, since that's using a DirectBinaryEncoder, which has no buffering. So the flush only has an effect on the underlying InputStream. If that's required then it may be a bug in Hadoop. Perhaps rather than including the entire stacktrace here, file a separate Jira issue about this with the stack trace, then just add an end-of-line comment referring to this issue, e.g., 'flush(); // Possible bug, see: AVRO-XXX'

          We shouldn't reference Jira issue numbers in sources except in comments. So, instead of adding a test-570 target in lang/py/build.xml you might define test.include and test.exclude properties near the top of that build.xml, then run it with something like 'ant -Dtest.include=...'.

          It would be great if someone more fluent in Python could look over the Python stuff here.

          And it would be great if someone more fluent in Maven could look over the Maven stuff here.

          Show
          Doug Cutting added a comment - Great to see this working! Some cosmetic comments on the patch: please don't include comments with your name, the date, stack traces, etc. please don't use tabs for indentation, but only spaces, 2 per indent level please check your patch to make sure no whitespace-only changes are made please put a space after // and before { These guidelines make it much easier to review patches. For the enum TRANSPROTO: this might better be called simply TRANSPORT or PROTOCOL or maybe TRANSPORT_PROTOCOL. if it's public in a public, non-test class then it needs javadoc i'd prefer when values of this are processed, rather than using an if-else structure, a switch statement is used with a 'default' clause that throws an exception. this makes it clearer that all values are handled. do we need a NONE value, or should we rather simply default the value to HTTP or SASL? The need for the flush in TetherKeySerialization is mysterious, since that's using a DirectBinaryEncoder, which has no buffering. So the flush only has an effect on the underlying InputStream. If that's required then it may be a bug in Hadoop. Perhaps rather than including the entire stacktrace here, file a separate Jira issue about this with the stack trace, then just add an end-of-line comment referring to this issue, e.g., 'flush(); // Possible bug, see: AVRO-XXX' We shouldn't reference Jira issue numbers in sources except in comments. So, instead of adding a test-570 target in lang/py/build.xml you might define test.include and test.exclude properties near the top of that build.xml, then run it with something like 'ant -Dtest.include=...'. It would be great if someone more fluent in Python could look over the Python stuff here. And it would be great if someone more fluent in Maven could look over the Maven stuff here.
          Hide
          Jeremy Lewi added a comment -

          Here's a patch that I think is ready for review.

          I also created a wiki article to explain how tethered jobs work.

          https://cwiki.apache.org/confluence/display/AVRO/Using+AVRO+To+Run+Python+Map+Reduce+Jobs

          Show
          Jeremy Lewi added a comment - Here's a patch that I think is ready for review. I also created a wiki article to explain how tethered jobs work. https://cwiki.apache.org/confluence/display/AVRO/Using+AVRO+To+Run+Python+Map+Reduce+Jobs
          Hide
          Jeremy Lewi added a comment -

          Cleaned up some white space.

          Show
          Jeremy Lewi added a comment - Cleaned up some white space.
          Hide
          Jeremy Lewi added a comment -

          Filed a separate issue for the broken unittest TestWordCount.testProjection.

          Show
          Jeremy Lewi added a comment - Filed a separate issue for the broken unittest TestWordCount.testProjection.
          Hide
          Jeremy Lewi added a comment -

          It looks like the test failure is in trunk (not part of the patch) and is in TestWordCount.testProjection. There seem to be at least two problems.

          First, testProjection appears to be using the wrong schema for reading the mapred/target/mapred/out/part-0000.avro.

          Second, even after attempting to correct the schema I was still getting an error trying to read records.

          Show
          Jeremy Lewi added a comment - It looks like the test failure is in trunk (not part of the patch) and is in TestWordCount.testProjection. There seem to be at least two problems. First, testProjection appears to be using the wrong schema for reading the mapred/target/mapred/out/part-0000.avro. Second, even after attempting to correct the schema I was still getting an error trying to read records.
          Hide
          Jeremy Lewi added a comment -

          There's one test failure in mapred that I will work on fixing and hopefully upload an updated patch soon.

          FYI, to run just the tests for mapred I had to first deploy all the modules.

          cd AVRO/lang/java
          mvn install -DskipTests
          cd AVRO/lang/java/mapred
          mvn test
          
          Show
          Jeremy Lewi added a comment - There's one test failure in mapred that I will work on fixing and hopefully upload an updated patch soon. FYI, to run just the tests for mapred I had to first deploy all the modules. cd AVRO/lang/java mvn install -DskipTests cd AVRO/lang/java/mapred mvn test
          Hide
          Jeremy Lewi added a comment -

          This is an updated patch.

          It implements the maven work around that I described in my earlier comment.

          I fixed all of the check style violations.

          I'm trying to run the unittests for mapred and tools but maven seems to abort on the test failures in ipc and I don't know how to make maven run just the tests for mapred.

          I'm working on a wiki article to provide some useful instructions on how to write tethered jobs in python.

          Show
          Jeremy Lewi added a comment - This is an updated patch. It implements the maven work around that I described in my earlier comment. I fixed all of the check style violations. I'm trying to run the unittests for mapred and tools but maven seems to abort on the test failures in ipc and I don't know how to make maven run just the tests for mapred. I'm working on a wiki article to provide some useful instructions on how to write tethered jobs in python.
          Hide
          Jeremy Lewi added a comment -

          I've found a possible work around for the maven issue. I can adjust the shade plugin so that it builds a jar avro-tools-1.6.0-SNAPSHOT-withdeps.jar which is equivalent to the current output of the shade plugin (i.e the ./tools/target/avro-tools-1.6.0-SNAPSHOT.jar).

          avro-tools-1.6.0-SNAPSHOT-withdeps.jar would contain unpacked versions of all dependency jars.

          avro-tools-1.6.0-SNAPSHOT.jar would only contain the code for avro-tools (i.e no dependencies)

          avro-tools-1.6.0-SNAPSHOT-job.jar would be a jar suitable for hadoop.

          So the only change to existing code would be the jar generated by the shade plugin would be renamed to avro-tools-1.6.0-SNAPSHOT-withdeps.jar. Not sure what this would break if anything.

          Show
          Jeremy Lewi added a comment - I've found a possible work around for the maven issue. I can adjust the shade plugin so that it builds a jar avro-tools-1.6.0-SNAPSHOT-withdeps.jar which is equivalent to the current output of the shade plugin (i.e the ./tools/target/avro-tools-1.6.0-SNAPSHOT.jar). avro-tools-1.6.0-SNAPSHOT-withdeps.jar would contain unpacked versions of all dependency jars. avro-tools-1.6.0-SNAPSHOT.jar would only contain the code for avro-tools (i.e no dependencies) avro-tools-1.6.0-SNAPSHOT-job.jar would be a jar suitable for hadoop. So the only change to existing code would be the jar generated by the shade plugin would be renamed to avro-tools-1.6.0-SNAPSHOT-withdeps.jar. Not sure what this would break if anything.
          Hide
          Doug Cutting added a comment -

          It'd be great to get this working! Does anyone understand the Maven issue with generating the job.jar?

          Show
          Doug Cutting added a comment - It'd be great to get this working! Does anyone understand the Maven issue with generating the job.jar?
          Hide
          Jeremy Lewi added a comment -

          Here is an initial patch. There are a number of issues which I list below. There is also an example/test of how a
          python tethered job works in

           
          lang/py/test/test_tether_word_count.py 
          

          Unfortunately, this test won't work for anyone but me because the paths for the avro-tools.jar is currently hard-coded. One solution might be to do string substitution as part of the build process like we do for some of the strings representing the various hand shake protocols.

          Here is a list of additional issues

          1. Tests are a bit fragile - for example to run the tests for avro tools you need to first install avro-mapred in your local repository
          2. There are checkstyle violations
          3. The tests for mapred fail if the hadoop version is 0.20.2-cdh3u0, but works for 0.20.203.0
          4. Hadoop 0.20.2 won't work for this version of the patch see AVRO-852
          5. The shade plugin for avro-tools interferes with my use of the assembly plugin to build a *-job.jar file suitable for hadoop submission (described more below).
          6. In the python task, messages logged using logger appear to be going to the log for stderr not the log file for stdout
          7. Combine reduceFlush and reduce (described more below)
          8. Excess comments - I've commented changes to existing code with AVRO-570 to facilitate review.

          shade and assembly plugin conflict:
          For avro-tools I modified the pom.xml file so that we build an avro-tools-VERSION-job.jar using the assembly plugin. The intention was to build a single jar that contains all of the dependent jars in the directory "lib" inside the jar file. Unfortunately, the shade plugin caused problems. In particular, before removing the shade plugin from the pom.xml file, the assembled avro-tools-*-job.jar would end up containing unpacked versions of all the dependency jars (in addition to storing the jars in ./lib).

          I removed the shade plugin configuration as a temporary work around. Hopefuly some maven wizards out there can help me resolve this.

          I think the jar plugin is building two jars one without the dependencies and one with the dependencies unpacked. The shade plugin then seems to be renaming one of these jars and I think that renaming may be part of the problem.

          combining reduceFlush and reduce:
          Currently, to implement the reducer in python a user would implement the methods "reduce" and "reduceFlush" in a subclass of TetherTask. "reduce" is invoked once for each tuple in the reduce phase. "reduceFlush" is invoked once after the last tuple in each group.

          I would like to replace reduce and reduceFlush with a single reduce function that would take a record generator as opposed to a record as its argument. So the implementation would look something like

           
          def reduce(self, recgen, collector): 
              for  rec in recgen: 
                    … process each record in this group 
           
               ... finalize this group... 
          

          The way I'm thinking of implementing this is by having reduce run in a separate thread from the server. The generator, recgen, would then suspend the thread when no records were available. When the server recieves a record from the parent process, it would wake up the thread running reduce.

          I've been working off of a public fork checked into bitbucket
          https://bitbucket.org/jlewi/avro

          Show
          Jeremy Lewi added a comment - Here is an initial patch. There are a number of issues which I list below. There is also an example/test of how a python tethered job works in lang/py/test/test_tether_word_count.py Unfortunately, this test won't work for anyone but me because the paths for the avro-tools.jar is currently hard-coded. One solution might be to do string substitution as part of the build process like we do for some of the strings representing the various hand shake protocols. Here is a list of additional issues Tests are a bit fragile - for example to run the tests for avro tools you need to first install avro-mapred in your local repository There are checkstyle violations The tests for mapred fail if the hadoop version is 0.20.2-cdh3u0, but works for 0.20.203.0 Hadoop 0.20.2 won't work for this version of the patch see AVRO-852 The shade plugin for avro-tools interferes with my use of the assembly plugin to build a *-job.jar file suitable for hadoop submission (described more below). In the python task, messages logged using logger appear to be going to the log for stderr not the log file for stdout Combine reduceFlush and reduce (described more below) Excess comments - I've commented changes to existing code with AVRO-570 to facilitate review. shade and assembly plugin conflict: For avro-tools I modified the pom.xml file so that we build an avro-tools-VERSION-job.jar using the assembly plugin. The intention was to build a single jar that contains all of the dependent jars in the directory "lib" inside the jar file. Unfortunately, the shade plugin caused problems. In particular, before removing the shade plugin from the pom.xml file, the assembled avro-tools-*-job.jar would end up containing unpacked versions of all the dependency jars (in addition to storing the jars in ./lib). I removed the shade plugin configuration as a temporary work around. Hopefuly some maven wizards out there can help me resolve this. I think the jar plugin is building two jars one without the dependencies and one with the dependencies unpacked. The shade plugin then seems to be renaming one of these jars and I think that renaming may be part of the problem. combining reduceFlush and reduce: Currently, to implement the reducer in python a user would implement the methods "reduce" and "reduceFlush" in a subclass of TetherTask. "reduce" is invoked once for each tuple in the reduce phase. "reduceFlush" is invoked once after the last tuple in each group. I would like to replace reduce and reduceFlush with a single reduce function that would take a record generator as opposed to a record as its argument. So the implementation would look something like def reduce(self, recgen, collector): for rec in recgen: … process each record in this group ... finalize this group... The way I'm thinking of implementing this is by having reduce run in a separate thread from the server. The generator, recgen, would then suspend the thread when no records were available. When the server recieves a record from the parent process, it would wake up the thread running reduce. I've been working off of a public fork checked into bitbucket https://bitbucket.org/jlewi/avro
          Hide
          Jeremy Lewi added a comment -

          Filed a related issue.

          Show
          Jeremy Lewi added a comment - Filed a related issue.
          Hide
          Jeremy Lewi added a comment -

          Happy to contribute, to such a fantastic project. I have most of the python code written and at least partially tested. I'm working on integrating with the java code. I'm hoping to submit a draft tomorrow to begin soliciting feedback from interested parties.

          Show
          Jeremy Lewi added a comment - Happy to contribute, to such a fantastic project. I have most of the python code written and at least partially tested. I'm working on integrating with the java code. I'm hoping to submit a draft tomorrow to begin soliciting feedback from interested parties.
          Hide
          Doug Cutting added a comment -

          Jeremy, that sounds great. Thanks for working on this!

          Show
          Doug Cutting added a comment - Jeremy, that sounds great. Thanks for working on this!
          Hide
          Jeremy Lewi added a comment -

          I think I'm going to take a stab at this but using the HTTP transport mechanism because it is already supported in python and my understanding of transport mechanisms is quite limited. Hopefully by the time I'm done, AVRO-703 will be done and we can switch to using the Socket mechanism.

          I'm aiming to have a first cut done by next Wednesday.

          Show
          Jeremy Lewi added a comment - I think I'm going to take a stab at this but using the HTTP transport mechanism because it is already supported in python and my understanding of transport mechanisms is quite limited. Hopefully by the time I'm done, AVRO-703 will be done and we can switch to using the Socket mechanism. I'm aiming to have a first cut done by next Wednesday.
          Hide
          Jeremy Lewi added a comment -

          I'm marking this issue as blocked by AVRO-703 because that would provide support for the Socket transport mechanism which does provide support for request only messages.

          Show
          Jeremy Lewi added a comment - I'm marking this issue as blocked by AVRO-703 because that would provide support for the Socket transport mechanism which does provide support for request only messages.
          Hide
          Doug Cutting added a comment -

          This could be implemented without one-way messages, but performance would be bad, so I'm marking this as dependent on them.

          Show
          Doug Cutting added a comment - This could be implemented without one-way messages, but performance would be bad, so I'm marking this as dependent on them.

            People

            • Assignee:
              Jeremy Lewi
              Reporter:
              Doug Cutting
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development