Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: Core, MapReduce Patterns
    • Labels:
      None

      Description

      The fact that we have special-case code in the MapsideJoinStrategy for the in-memory and MR-based Pipeline instances has always bugged me, so I set out to eliminate the distinction between the two impls by creating a new interface, ReadableSourceBundle<T>, that encapsulates the MR and in-memory specific logic for doing mapside joins in order to remove the special-case code in MapsideJoinStrategy and hopefully make other implementations that use our mapside-join patterns much easier to test.

      1. CRUNCH-278d.patch
        133 kB
        Josh Wills
      2. CRUNCH-278c.patch
        132 kB
        Josh Wills
      3. CRUNCH-278b.patch
        79 kB
        Josh Wills
      4. CRUNCH-278.patch
        56 kB
        Josh Wills

        Issue Links

          Activity

          Hide
          Josh Wills added a comment -

          And committed. Thanks Gabriel Reid!

          Show
          Josh Wills added a comment - And committed. Thanks Gabriel Reid !
          Hide
          Gabriel Reid added a comment -

          +1

          Show
          Gabriel Reid added a comment - +1
          Hide
          Josh Wills added a comment -

          Gabriel Reid thanks for the reviews on this. Here's the javadoc for the MapsideJoinStrategy.

          Show
          Josh Wills added a comment - Gabriel Reid thanks for the reviews on this. Here's the javadoc for the MapsideJoinStrategy.
          Hide
          Gabriel Reid added a comment -

          Awesome, super nice way of building in the backwards compatibility there.

          Just one last thing that I think needs doing is javadoc on the constructor of the MapsideJoinStrategy to explain what the materialize parameter means.

          Show
          Gabriel Reid added a comment - Awesome, super nice way of building in the backwards compatibility there. Just one last thing that I think needs doing is javadoc on the constructor of the MapsideJoinStrategy to explain what the materialize parameter means.
          Hide
          Josh Wills added a comment -

          All good points-- here's a patch that incorporates the changes as diffs relative to 278b (I'll rebase them before I submit this, just wanted to make it easier to review.)

          I think you're right that we need to keep the default behavior the same for mapside joins-- it would be way too surprising for people upgrading. I handled that by adding a boolean materialize argument to asReadable() and creating a constructor for MapsideJoin that allows you to set it to false so folks can experiment with it safely.

          Show
          Josh Wills added a comment - All good points-- here's a patch that incorporates the changes as diffs relative to 278b (I'll rebase them before I submit this, just wanted to make it easier to review.) I think you're right that we need to keep the default behavior the same for mapside joins-- it would be way too surprising for people upgrading. I handled that by adding a boolean materialize argument to asReadable() and creating a constructor for MapsideJoin that allows you to set it to false so folks can experiment with it safely.
          Hide
          Gabriel Reid added a comment -

          Looks good to me in general, and it seems to work as it should.

          About the name, yeah, ReadableData isn't super clear, but it's of course difficult to think of something better. DataSourceIterable? MaterializableDataStream? MaterializableIterable?

          A few other things I came up with while going over it:

          • if you don't materialize a PCollection in a whole stream of DoFns between a Source and where a ReadableData is being used (e.g. MapsideJoinStrategy), then everything gets run in-memory. I know that this is what was discussed/agreed upon, but it's also a pretty big change with the previous (default) behaviour which worries me a little bit. Maybe we should at least add some javadoc in MapsideJoin to make this clear.
          • I think that DelegatingReadableData and DoFnIterator should probably be moved to the o.a.c.impl.mr.collect package. They're only used from that package right, and they're currently in o.a.c.util which I think is intended to include user-facing utils.
          • in the default case (ReadableData is read directly from the source and DoFns are run there) it doesn't show up in the job plan dotfile at all. Definitely not a reason to block this ticket, but it is something that should be added in a follow-up ticket.
          • the file headers seem a little wonky. I saw a couple of files with Cloudera headers, which I'm guessing is a bigger issue, as well as a few with some kind of messed up pasted-in file headers (i.e. TrevniReadableData.java). I know that's being super nit-picky (and I totally feel like I'm being that guy for pointing it out), but I guess that's stuff that's probably best to keep in order.
          Show
          Gabriel Reid added a comment - Looks good to me in general, and it seems to work as it should. About the name, yeah, ReadableData isn't super clear, but it's of course difficult to think of something better. DataSourceIterable? MaterializableDataStream? MaterializableIterable? A few other things I came up with while going over it: if you don't materialize a PCollection in a whole stream of DoFns between a Source and where a ReadableData is being used (e.g. MapsideJoinStrategy), then everything gets run in-memory. I know that this is what was discussed/agreed upon, but it's also a pretty big change with the previous (default) behaviour which worries me a little bit. Maybe we should at least add some javadoc in MapsideJoin to make this clear. I think that DelegatingReadableData and DoFnIterator should probably be moved to the o.a.c.impl.mr.collect package. They're only used from that package right, and they're currently in o.a.c.util which I think is intended to include user-facing utils. in the default case (ReadableData is read directly from the source and DoFns are run there) it doesn't show up in the job plan dotfile at all. Definitely not a reason to block this ticket, but it is something that should be added in a follow-up ticket. the file headers seem a little wonky. I saw a couple of files with Cloudera headers, which I'm guessing is a bigger issue, as well as a few with some kind of messed up pasted-in file headers (i.e. TrevniReadableData.java). I know that's being super nit-picky (and I totally feel like I'm being that guy for pointing it out), but I guess that's stuff that's probably best to keep in order.
          Hide
          Josh Wills added a comment -

          Here's my second cut at this, which reflects our discussion over the past week or so. This is the full version, which can support processing DoFns in-memory on the cluster for small data sources in order to save an MR job.

          Couple of concerns: the name of the object (ReadableData<T>) isn't the greatest, and I'd welcome other suggestions. Also, I'm not supporting using a PGroupedTable with this API right now b/c I think we need to fix how we serialize grouped data in a different JIRA that will follow this one.

          Show
          Josh Wills added a comment - Here's my second cut at this, which reflects our discussion over the past week or so. This is the full version, which can support processing DoFns in-memory on the cluster for small data sources in order to save an MR job. Couple of concerns: the name of the object (ReadableData<T>) isn't the greatest, and I'd welcome other suggestions. Also, I'm not supporting using a PGroupedTable with this API right now b/c I think we need to fix how we serialize grouped data in a different JIRA that will follow this one.
          Hide
          Gabriel Reid added a comment -

          Yeah, I think that that could work for the more general case. Calling toBundle on a PCollection would then back up to the last call to materialize and execute everything from there on in memory, and the default case is to do nothing in memory.

          The only issue I see with this is that it makes the materialize() call into something that visibly mutates the state of a PCollection. Materializing a PCollection mutates state under the covers anyhow, but adding these semantics to materialize very slightly breaks the idea of immutability around PCollection. That's probably not a big enough reason to not take this approach though.

          Show
          Gabriel Reid added a comment - Yeah, I think that that could work for the more general case. Calling toBundle on a PCollection would then back up to the last call to materialize and execute everything from there on in memory, and the default case is to do nothing in memory. The only issue I see with this is that it makes the materialize() call into something that visibly mutates the state of a PCollection. Materializing a PCollection mutates state under the covers anyhow, but adding these semantics to materialize very slightly breaks the idea of immutability around PCollection. That's probably not a big enough reason to not take this approach though.
          Hide
          Josh Wills added a comment -

          I agree that this is a fuzzy area and an important one to get right in terms of the APIs.

          For the first case: if we marked muchSmallerTable as materialized (via a call to materialize()), we could use that as a signal, no? Does that work in a more general case?

          Show
          Josh Wills added a comment - I agree that this is a fuzzy area and an important one to get right in terms of the APIs. For the first case: if we marked muchSmallerTable as materialized (via a call to materialize()), we could use that as a signal, no? Does that work in a more general case?
          Hide
          Gabriel Reid added a comment -

          I think that there's still something that I think I'm not totally getting, or I'm looking at this from the wrong angle.

          Taking two different cases of the same kind of thing, I think we need to be able to distinguish how we want them to be dealt with, as follows.

          In this example, we want the DoFns leading up to the toBundle() call to be run in a separate job:

          PTable<K,V> hugeTable = pipeline.read(...);
          PTable<K,V> muchSmallerTable = hugeTable.parallelDo(myFilterFn);
          // Now MapsideJoinStrategy will call muchSmallerTable.toBundle(), but we want
          // myFilterFn to have run in a separate MR job leading up to this MapsideJoin because
          // it's reducing a huge amount of data into something that fits in memory
          PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, muchSmallerTable);
          

          and in this example we want the DoFns to be run in memory while directly reading in smallTable from the Source

          PTable<K,V> smallTable = pipeline.read(...);
          PTable<K,V> filteredSmallTable = smallTable.parallelDo(myFilterFn);
          // Now MapsideJoinStrategy will call filteredSmallTable.toBundle(), and we want
          // myFilterFn to be run in each of the mappers while materializing the contents 
          // of smallTable, because there's no need to kick of a separate MR job for that
          // due to the small size of the data
          PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, filteredSmallTable);
          

          I think that the point is that there needs to be the ability in the API to set a spot where we can say "everything from here on in will be run in memory". We can do that with something on ParallelDoOptions, but I think we can run into the same problem again where it's hard to define what will (or even what should) happen if you want to write a PCollection to storage if it's got some in-memory operations defined somewhere further upstream in the Pipeline.

          FWIW, I'm pretty much convinced that the MaterializedPCollection isn't the way to go for this.

          Show
          Gabriel Reid added a comment - I think that there's still something that I think I'm not totally getting, or I'm looking at this from the wrong angle. Taking two different cases of the same kind of thing, I think we need to be able to distinguish how we want them to be dealt with, as follows. In this example, we want the DoFns leading up to the toBundle() call to be run in a separate job: PTable<K,V> hugeTable = pipeline.read(...); PTable<K,V> muchSmallerTable = hugeTable.parallelDo(myFilterFn); // Now MapsideJoinStrategy will call muchSmallerTable.toBundle(), but we want // myFilterFn to have run in a separate MR job leading up to this MapsideJoin because // it's reducing a huge amount of data into something that fits in memory PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, muchSmallerTable); and in this example we want the DoFns to be run in memory while directly reading in smallTable from the Source PTable<K,V> smallTable = pipeline.read(...); PTable<K,V> filteredSmallTable = smallTable.parallelDo(myFilterFn); // Now MapsideJoinStrategy will call filteredSmallTable.toBundle(), and we want // myFilterFn to be run in each of the mappers while materializing the contents // of smallTable, because there's no need to kick of a separate MR job for that // due to the small size of the data PTable<K,<U,V>> joined = new MapsideJoinStrategy().join(left, filteredSmallTable); I think that the point is that there needs to be the ability in the API to set a spot where we can say "everything from here on in will be run in memory". We can do that with something on ParallelDoOptions, but I think we can run into the same problem again where it's hard to define what will (or even what should) happen if you want to write a PCollection to storage if it's got some in-memory operations defined somewhere further upstream in the Pipeline. FWIW, I'm pretty much convinced that the MaterializedPCollection isn't the way to go for this.
          Hide
          Micah Whitacre added a comment -

          The MaterialzedPCollection seems nice because it meshes nicely with metaphors already in Crunch but seems dangerous for the ill-informed consumer. Specifically since the PCollection can be passed around it might be passed to functionality expecting to be able to persist the collection and then encounter the issue.

          Therefore the bundle approach seems nice because it clearly sets that distinction. To confirm though if we went with this approach...

          PTable<K, V> cnt = stuff.count();
          ReadableSourceBundle<Pair<K, V>> = cnt.toBundle();

          Consumers could still do whatever processing/persisting they wanted with the "cnt" value correct? So the cnt.toBundle() would have no affect on it? Also GBKs would be allowed prior to creating the bundle? In HBase rows can be broken up in a PTable due to the configured batch size and could potentially require that grouping.

          Show
          Micah Whitacre added a comment - The MaterialzedPCollection seems nice because it meshes nicely with metaphors already in Crunch but seems dangerous for the ill-informed consumer. Specifically since the PCollection can be passed around it might be passed to functionality expecting to be able to persist the collection and then encounter the issue. Therefore the bundle approach seems nice because it clearly sets that distinction. To confirm though if we went with this approach... PTable<K, V> cnt = stuff.count(); ReadableSourceBundle<Pair<K, V>> = cnt.toBundle(); Consumers could still do whatever processing/persisting they wanted with the "cnt" value correct? So the cnt.toBundle() would have no affect on it? Also GBKs would be allowed prior to creating the bundle? In HBase rows can be broken up in a PTable due to the configured batch size and could potentially require that grouping.
          Hide
          Gabriel Reid added a comment -

          Another idea I just had for having an option in the API to do stuff in-memory could be just adding it to ParallelDoOptions. On the other hand though, this might make things confusing – i.e. what would it mean if you call pipeline.write() with a PCollection that was created by a call to parallelDo with the "do-fns-in-memory" flag turned on?

          Show
          Gabriel Reid added a comment - Another idea I just had for having an option in the API to do stuff in-memory could be just adding it to ParallelDoOptions. On the other hand though, this might make things confusing – i.e. what would it mean if you call pipeline.write() with a PCollection that was created by a call to parallelDo with the "do-fns-in-memory" flag turned on?
          Hide
          Gabriel Reid added a comment -

          Ok, I get it.

          The issue in the API in making it possible to specify the boundary between MR job and in-memory is what I was going for with the MaterializedPCollection constructor that I posted before (copied here below).

          PTable<ImmutableBytesWritable,Result> htableContents = pipeline.read(FromHBase.table());
          PTable<A,B> convertedHTable = new MaterializedPCollection(htableContents).parallelDo(new DoSomethingFn());
          PTable<A,Pair<C,B>> joined = new MapsideJoinStrategy().join(anotherPTable, convertedHTable);
          

          My idea was that everything coming out of the MaterializedPCollection would be done in memory, so you could have something that was being calculated upstream in the pipeline be read into memory starting from the point where you instantiated a MaterializedPCollection.

          In any case, yeah, I think it would be pretty important to be able to clearly specify which things you want done in MR and which you want done in memory.

          Show
          Gabriel Reid added a comment - Ok, I get it. The issue in the API in making it possible to specify the boundary between MR job and in-memory is what I was going for with the MaterializedPCollection constructor that I posted before (copied here below). PTable<ImmutableBytesWritable,Result> htableContents = pipeline.read(FromHBase.table()); PTable<A,B> convertedHTable = new MaterializedPCollection(htableContents).parallelDo( new DoSomethingFn()); PTable<A,Pair<C,B>> joined = new MapsideJoinStrategy().join(anotherPTable, convertedHTable); My idea was that everything coming out of the MaterializedPCollection would be done in memory, so you could have something that was being calculated upstream in the pipeline be read into memory starting from the point where you instantiated a MaterializedPCollection. In any case, yeah, I think it would be pretty important to be able to clearly specify which things you want done in MR and which you want done in memory.
          Hide
          Josh Wills added a comment -

          I don't think it would prevent using the library functions-- it would just look like:

          PCollection<K> stuff = ...;
          PTable<K, V> cnt = stuff.count();
          ReadableSourceBundle<Pair<K, V>> = cnt.toBundle();

          i.e., you would just do whatever transforms you wanted to apply before converting the PCollection to a bundle.

          That said, I think that if we had an instance like this, we would be inclined to kick off an MR job vs. just reading the data in memory and doing the count/aggregation ourselves using the in-memory pipeline. Maybe an option on the API to control that?

          Show
          Josh Wills added a comment - I don't think it would prevent using the library functions-- it would just look like: PCollection<K> stuff = ...; PTable<K, V> cnt = stuff.count(); ReadableSourceBundle<Pair<K, V>> = cnt.toBundle(); i.e., you would just do whatever transforms you wanted to apply before converting the PCollection to a bundle. That said, I think that if we had an instance like this, we would be inclined to kick off an MR job vs. just reading the data in memory and doing the count/aggregation ourselves using the in-memory pipeline. Maybe an option on the API to control that?
          Hide
          Gabriel Reid added a comment -

          Ok, got it.

          The biggest trade-off I see with using ReadableSourceBundle vs something that implements PCollection for the in-memory thing is that using ReadableSourceBundle will prevent the issue with invalid write calls, but also prevent using library functions like the stuff in o.a.c.lib.*, such as Distinct.

          I'm sure that there are some ways we could get around the Pipeline#write issue, but there's a good chance that some/all of them would come at the cost of being confusing for users.

          Show
          Gabriel Reid added a comment - Ok, got it. The biggest trade-off I see with using ReadableSourceBundle vs something that implements PCollection for the in-memory thing is that using ReadableSourceBundle will prevent the issue with invalid write calls, but also prevent using library functions like the stuff in o.a.c.lib.*, such as Distinct. I'm sure that there are some ways we could get around the Pipeline#write issue, but there's a good chance that some/all of them would come at the cost of being confusing for users.
          Hide
          Josh Wills added a comment -

          Yeah, that's essentially it; the difference I had in mind was that the object that you would create that would represent the data in the root PCollection + the subsequent DoFns wouldn't be a PCollection, it would be a ReadableSourceBundle (or something less wordy than that), so as to not have the issue of the invalid write() calls. But the core idea (the processing of the DoSomethingFn happening in memory in the mapper during the initialize() call) is the same.

          Show
          Josh Wills added a comment - Yeah, that's essentially it; the difference I had in mind was that the object that you would create that would represent the data in the root PCollection + the subsequent DoFns wouldn't be a PCollection, it would be a ReadableSourceBundle (or something less wordy than that), so as to not have the issue of the invalid write() calls. But the core idea (the processing of the DoSomethingFn happening in memory in the mapper during the initialize() call) is the same.
          Hide
          Gabriel Reid added a comment -

          So just to make sure I'm on the same page here: I'm thinking that in MapsideJoin case, the way it would work today is like this:

          PTable<ImmutableBytesWritable,Result> htableContents = pipeline.read(FromHBase.table());
          PTable<A,B> convertedHTable = htableContents.parallelDo(new DoSomethingFn());
          PTable<A,Pair<C,B>> joined = new MapsideJoinStrategy().join(anotherPTable, convertedHTable);
          

          and this would have the drawback that created the convertedHTable would require a whole MR job to be kicked off in order to get to convertedHTable, although what we want is to have the conversion to convertedHTable happen in the initialize method in the MapsideJoin to avoid kicking off the MR job.

          Wouldn't this be possible with something like a "materialized" PCollection, which could then operate in the same way as the in-memory pcollections? So then we would end with something like this:

          PTable<ImmutableBytesWritable,Result> htableContents = pipeline.read(FromHBase.table());
          PTable<A,B> convertedHTable = new MaterializedPCollection(htableContents).parallelDo(new DoSomethingFn());
          PTable<A,Pair<C,B>> joined = new MapsideJoinStrategy().join(anotherPTable, convertedHTable);
          

          Then when materialize() was called on a MaterializedPCollection, we would just materialize the root PCollection and load everything in memory and pass it through the rest of it's pipeline in memory so that the processing of the DoSomethingFn would occur in memory in the mapper. I guess that this would also imply that calling Pipeline#write on a MaterializedCollection would throw an exception, unless there was some way of getting around that.

          Is that kind of what you had in mind? Or am I talking about something totally different?

          Show
          Gabriel Reid added a comment - So just to make sure I'm on the same page here: I'm thinking that in MapsideJoin case, the way it would work today is like this: PTable<ImmutableBytesWritable,Result> htableContents = pipeline.read(FromHBase.table()); PTable<A,B> convertedHTable = htableContents.parallelDo( new DoSomethingFn()); PTable<A,Pair<C,B>> joined = new MapsideJoinStrategy().join(anotherPTable, convertedHTable); and this would have the drawback that created the convertedHTable would require a whole MR job to be kicked off in order to get to convertedHTable, although what we want is to have the conversion to convertedHTable happen in the initialize method in the MapsideJoin to avoid kicking off the MR job. Wouldn't this be possible with something like a "materialized" PCollection, which could then operate in the same way as the in-memory pcollections? So then we would end with something like this: PTable<ImmutableBytesWritable,Result> htableContents = pipeline.read(FromHBase.table()); PTable<A,B> convertedHTable = new MaterializedPCollection(htableContents).parallelDo( new DoSomethingFn()); PTable<A,Pair<C,B>> joined = new MapsideJoinStrategy().join(anotherPTable, convertedHTable); Then when materialize() was called on a MaterializedPCollection, we would just materialize the root PCollection and load everything in memory and pass it through the rest of it's pipeline in memory so that the processing of the DoSomethingFn would occur in memory in the mapper. I guess that this would also imply that calling Pipeline#write on a MaterializedCollection would throw an exception, unless there was some way of getting around that. Is that kind of what you had in mind? Or am I talking about something totally different?
          Hide
          Micah Whitacre added a comment -

          For example, think of being able to do mapside joins against (say) an HBase table, where you could construct the PTable of key-value pairs that is loaded in memory by reading the table into the client and then doing some processing on those values inside of the map initialization vs. having to run a MR job to process that data into a file as a pre-processing step to running the job. I'm not sure if that's the sort of thing folks would be interested in doing, but it seemed cool to me.

          Did someone give you a copy of our code? We don't do the Mapside portion but have a number of use cases where that data should be small enough we should be able to do it mapside. Additionally our APIs are written in the form of PTable<Avro,Avro> so we usually have transformed PTable<ImmutableBytesWritable, Result> from HBase into PTable<Avro,Avro> using simple MapFn's before we would want to do the joins.

          I need to review the ReadableSourceBundle still but just wanted to confirm that the use case you were heading towards would definitely get used.

          Show
          Micah Whitacre added a comment - For example, think of being able to do mapside joins against (say) an HBase table, where you could construct the PTable of key-value pairs that is loaded in memory by reading the table into the client and then doing some processing on those values inside of the map initialization vs. having to run a MR job to process that data into a file as a pre-processing step to running the job. I'm not sure if that's the sort of thing folks would be interested in doing, but it seemed cool to me. Did someone give you a copy of our code? We don't do the Mapside portion but have a number of use cases where that data should be small enough we should be able to do it mapside. Additionally our APIs are written in the form of PTable<Avro,Avro> so we usually have transformed PTable<ImmutableBytesWritable, Result> from HBase into PTable<Avro,Avro> using simple MapFn's before we would want to do the joins. I need to review the ReadableSourceBundle still but just wanted to confirm that the use case you were heading towards would definitely get used.
          Hide
          Josh Wills added a comment -

          So I had two contexts in mind: in-memory for unit testing, but also having these DoFns running inside of a MR context, where they're not strictly part of the CrunchMapper/CrunchReducer flow, but operating more like embedded inside of the initialize() process that is reading records in from the distributed cache and then performing filters/transforms on them.

          For example, think of being able to do mapside joins against (say) an HBase table, where you could construct the PTable of key-value pairs that is loaded in memory by reading the table into the client and then doing some processing on those values inside of the map initialization vs. having to run a MR job to process that data into a file as a pre-processing step to running the job. I'm not sure if that's the sort of thing folks would be interested in doing, but it seemed cool to me.

          Show
          Josh Wills added a comment - So I had two contexts in mind: in-memory for unit testing, but also having these DoFns running inside of a MR context, where they're not strictly part of the CrunchMapper/CrunchReducer flow, but operating more like embedded inside of the initialize() process that is reading records in from the distributed cache and then performing filters/transforms on them. For example, think of being able to do mapside joins against (say) an HBase table, where you could construct the PTable of key-value pairs that is loaded in memory by reading the table into the client and then doing some processing on those values inside of the map initialization vs. having to run a MR job to process that data into a file as a pre-processing step to running the job. I'm not sure if that's the sort of thing folks would be interested in doing, but it seemed cool to me.
          Hide
          Gabriel Reid added a comment -

          Do you see the ReadableSourceBundle as always operating in an in-memory context? Because in that case, wouldn't it simply be a case of using a MemPipeline to pass the incoming values from the ReableSource through the DoFns?

          Have you got a pretty good idea of how you would want the API to look?

          Show
          Gabriel Reid added a comment - Do you see the ReadableSourceBundle as always operating in an in-memory context? Because in that case, wouldn't it simply be a case of using a MemPipeline to pass the incoming values from the ReableSource through the DoFns? Have you got a pretty good idea of how you would want the API to look?
          Hide
          Josh Wills added a comment -

          Err, wait--- if I did that, we'd be making the ReadableSourceBundle very MR-aware. I wonder if there's a way around that...?

          Show
          Josh Wills added a comment - Err, wait--- if I did that, we'd be making the ReadableSourceBundle very MR-aware. I wonder if there's a way around that...?
          Hide
          Josh Wills added a comment -

          Roger on the additional docs for ReadableSourceBundle.

          Your assessment of where I was thinking of taking it is correct. My thought was that doing it might require an API change to ReadableSourceBundle.read() to take a TaskInputOutputContext (or something similar) so that the DoFns that were contained in the ReadableSourceBundle could have a proper setContext/initialize performed before they were called: what do you think of having the more complex API available now, even if we don't actually set it up to use the DoFns (which I agree w/you on: it will have some additional complexity involved to make it work correctly.)

          Show
          Josh Wills added a comment - Roger on the additional docs for ReadableSourceBundle. Your assessment of where I was thinking of taking it is correct. My thought was that doing it might require an API change to ReadableSourceBundle.read() to take a TaskInputOutputContext (or something similar) so that the DoFns that were contained in the ReadableSourceBundle could have a proper setContext/initialize performed before they were called: what do you think of having the more complex API available now, even if we don't actually set it up to use the DoFns (which I agree w/you on: it will have some additional complexity involved to make it work correctly.)
          Hide
          Gabriel Reid added a comment -

          I like the idea behind it, and it looks good to me. Definitely good to get rid of what was basically two difference implementations of the MapsideJoinStrategy, and I have the feeling that this could be useful for a number of things outside of the context of mapside joins as well, even if I don't have a totally clear idea of what that would be exactly.

          One thing I would mention is to add some docs on the ReadableSourceBundle, particularly on the configure method to specify where it gets called in the lifecycle. I understand how it works now, but I'm pretty sure I'll forget that before the next time I need to know it

          The idea of generalizing ReadableSourceBundle to allow including DoFns in it sounds interesting. If I think about it, it's a kind of optimization so that you could do something like filter/transform values from the in-memory side of the PCollection within the context of the MapsideJoin instead of doing it within the context of a MR job leading up to the use of the MapsideJoin. Am I seeing that correctly? Or do you see new functionality that would be added that isn't possible at all right now? In case it's just an optimization, maybe better hold off on that for this ticket, as it seems like there could be a fair bit of extra complexity that could creep in there.

          Show
          Gabriel Reid added a comment - I like the idea behind it, and it looks good to me. Definitely good to get rid of what was basically two difference implementations of the MapsideJoinStrategy, and I have the feeling that this could be useful for a number of things outside of the context of mapside joins as well, even if I don't have a totally clear idea of what that would be exactly. One thing I would mention is to add some docs on the ReadableSourceBundle, particularly on the configure method to specify where it gets called in the lifecycle. I understand how it works now, but I'm pretty sure I'll forget that before the next time I need to know it The idea of generalizing ReadableSourceBundle to allow including DoFns in it sounds interesting. If I think about it, it's a kind of optimization so that you could do something like filter/transform values from the in-memory side of the PCollection within the context of the MapsideJoin instead of doing it within the context of a MR job leading up to the use of the MapsideJoin. Am I seeing that correctly? Or do you see new functionality that would be added that isn't possible at all right now? In case it's just an optimization, maybe better hold off on that for this ticket, as it seems like there could be a fair bit of extra complexity that could creep in there.
          Hide
          Josh Wills added a comment -

          Micah Whitacre and Gabriel Reid I'd be obliged if you guys could take a look at this first cut. I'd like to generalize it somewhat so that you could include DoFns inside of the ReadableSourceBundle<T> instances that could be used for filtering/transforming the map-side inputs.

          I'm also not locked into the names or API structure here-- if there's something you guys like better, please let me know.

          Show
          Josh Wills added a comment - Micah Whitacre and Gabriel Reid I'd be obliged if you guys could take a look at this first cut. I'd like to generalize it somewhat so that you could include DoFns inside of the ReadableSourceBundle<T> instances that could be used for filtering/transforming the map-side inputs. I'm also not locked into the names or API structure here-- if there's something you guys like better, please let me know.

            People

            • Assignee:
              Josh Wills
              Reporter:
              Josh Wills
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development