Here is an initial patch. There are a number of issues which I list below. There is also an example/test of how a
python tethered job works in
Unfortunately, this test won't work for anyone but me because the paths for the avro-tools.jar is currently hard-coded. One solution might be to do string substitution as part of the build process like we do for some of the strings representing the various hand shake protocols.
Here is a list of additional issues
- Tests are a bit fragile - for example to run the tests for avro tools you need to first install avro-mapred in your local repository
- There are checkstyle violations
- The tests for mapred fail if the hadoop version is 0.20.2-cdh3u0, but works for 0.20.203.0
- Hadoop 0.20.2 won't work for this version of the patch see AVRO-852
- The shade plugin for avro-tools interferes with my use of the assembly plugin to build a *-job.jar file suitable for hadoop submission (described more below).
- In the python task, messages logged using logger appear to be going to the log for stderr not the log file for stdout
- Combine reduceFlush and reduce (described more below)
- Excess comments - I've commented changes to existing code with AVRO-570 to facilitate review.
shade and assembly plugin conflict:
For avro-tools I modified the pom.xml file so that we build an avro-tools-VERSION-job.jar using the assembly plugin. The intention was to build a single jar that contains all of the dependent jars in the directory "lib" inside the jar file. Unfortunately, the shade plugin caused problems. In particular, before removing the shade plugin from the pom.xml file, the assembled avro-tools-*-job.jar would end up containing unpacked versions of all the dependency jars (in addition to storing the jars in ./lib).
I removed the shade plugin configuration as a temporary work around. Hopefuly some maven wizards out there can help me resolve this.
I think the jar plugin is building two jars one without the dependencies and one with the dependencies unpacked. The shade plugin then seems to be renaming one of these jars and I think that renaming may be part of the problem.
combining reduceFlush and reduce:
Currently, to implement the reducer in python a user would implement the methods "reduce" and "reduceFlush" in a subclass of TetherTask. "reduce" is invoked once for each tuple in the reduce phase. "reduceFlush" is invoked once after the last tuple in each group.
I would like to replace reduce and reduceFlush with a single reduce function that would take a record generator as opposed to a record as its argument. So the implementation would look something like
def reduce(self, recgen, collector):
for rec in recgen:
… process each record in this group
... finalize this group...
The way I'm thinking of implementing this is by having reduce run in a separate thread from the server. The generator, recgen, would then suspend the thread when no records were available. When the server recieves a record from the parent process, it would wake up the thread running reduce.
I've been working off of a public fork checked into bitbucket