Pig
  1. Pig
  2. PIG-966

Proposed rework for LoadFunc, StoreFunc, and Slice/r interfaces

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.0
    • Component/s: impl
    • Labels:
      None
    • Hadoop Flags:
      Incompatible change

      Description

      I propose that we rework the LoadFunc, StoreFunc, and Slice/r interfaces significantly. See http://wiki.apache.org/pig/LoadStoreRedesignProposal for full details

        Issue Links

          Activity

          Hide
          Dmitriy V. Ryaboy added a comment -

          The comments below are from both me and Ashutosh.

          We'd like to preface this with saying that we think overall, the proposed changes are very useful and important, and are likely to result in significantly reducing the barriers to Pig adoption in the broader Hadoop user community.

          There is a lot of suggestions / critiques below, but that's just because we care

          On to the notes.

          Names of interfaces

          Can you explain why everything has a Load prefix? Seems like this limits the interfaces unnecessarily, and is a bit inconsistent semantically (LoadMetadata does not represent metadata associated with loading – it loads metadata. LoadStatistics does not load statistics; it represents statistics, and is loaded using LoadMetadata).

          How about:
          LoadCaster -> PigTypeCaster,

          LoadPushDown -> Filterable, Projectionable (the latter may need a better name)
          (clearly, we are also suggesting breaking down the interface into multiple interfaces – more on that later)

          LoadSchema -> ResourceSchema
          LoadFieldSchema -> FieldSchema or ResourceFieldSchema
          LoadMetadata -> MetadataReader
          StoreMetadata -> MetadataWriter
          LoadStatistics -> ResourceStatistics

          LoadFunc

          In regards to the appropriate parameters for setURI – can you explain the advantage of this over Strings in more detail? I think the current setLocation approach is preferable; it gives users more flexibility. Plus Hadoop Paths are constructed from strings, not URIs, so we are forcing a string->uri->string conversion on the common case.

          The getLoadCaster method – perhaps getTypeCaster or getPigTypeCaster is a better name?

          prepareToRead: does it need a finishReading() mate?

          I would like to see a "standard" method for getting the jobconf (or whatever it is called in 20/21), both for LoadFunc and StoreFunc.

          LoadCaster (Or PigTypeCaster..)

          This interface is implemented by UTF8StorageConverter. Let's decide on what these are – casters or converters – and use one term.

          LoadMetadata (or MetadataLoader)

          Some thoughts on the problem of what happens when the loader is loading multiple resources or a resource with multiple partitions.

          We think that the schema should be uniform for everything a single instance of a loader is responsible for loading (and the loader can fill in null or defaults where appropriate if some resources are missing fields).

          Statistics should be aggregated, since the collection of resources will be treated as one (knowledge of relevant partitions would be used by a Filterable/Projectionable/Pushdownable loader to push selections down, not, I think, by downstream operators).

          So we have two options.

          In option 1, getStatistics would return a collection (lower c) of stats associated with the resources that the loader is loading, perhaps as a Map of String->ResourceStatistics. These would need to go through a stat aggregator of some sort that would know how to deal with unifying statistics across multiple resources in a generic way.

          In option 2, getStatistics would be responsible for its own implementation of aggregation, which would give it flexibility in terms of how such aggregation is done. Since we don't expect many stat stores, this seems preferable to us, as generic aggregation is going to be hard to get right).

          (of course there is option 3, where we have a default stat aggregator class that can be extended/overridden by individual MetadataLoaders, but I imagine this would be a hard sell).

          LoadSchema (or ResourceSchema)

          Should org.apache.pig.impl.logicalLayer.schema.Schema be changed to use this as an internal representation?

          I like how sort information is handled here. Perhaps we can consider using this approach instead of SortInfo in PIG-953
          If PigSchema implements or contains ResourceSchema, SortInfo will no longer be needed.

          PartitionKeys aren't really part of schema; they are a storage/distribution property. This should go into the Metadata and refer to the schema.

          LoadStatistics (or ResourceStatistics)

          Why the public fields? Not that I am a huge fan of getters and setters but I sense findbugs warnings heading our way.

          We need to account for some statistics being missing. What should numRecords be set to if we don't know the number of records?

          We can use Long and set it to null;
          we can use a magic value (-1?);
          we can wrap in a getter and throw an exception (ugh).

          I had envisioned statistics as more of a key-value thing, with some keys predefined in a separate class. So we would have:

          ResourceStats.NUM_RECORDS
          ResourceStats.SIZE_IN_BYTES
          //etc
          

          and to get the stats we would call

          MyResourceStats.getLong(ResourceStats.NUM_RECORDS)
          MyResourceStats.getFloat(ResourceStats.SOMETHING_THAT_IS_A_FLOAT)
          //etc
          

          This allows us to be far more flexible in regards to the things marked as "//probably more here."

          Something that is probably worth mentioning in its own topic, but we have been considering extending Pig Latin to allow users to specify some dataset properties on the fly. For example when performing a filter, a user can help planning by saying something like

          bar = FILTER foo BY myUdf($1) EXPECT SELECTIVITY=0.95 -- just getting rid of some noise!
          foobar = JOIN foo by id, bar by id EXPECT SELECTIVITY=0.1 -- small cross-section!
          

          Having the user provide a rough estimate of operator selectivity would be far more effective than trying to estimate result sizes based on stats. In fact no less an authority than Microsoft's Surajit Chaudhury advocates opening up these sorts of communication channels in SQL to help optimizers do a better job (see his SIGMOD 2009 paper).

          This implies that statistics can (and should) be associated not just with loaded resources, but with operator outputs.

          LoadPushdown

          As alluded to above, I am not sure this is a good interface. The idea is that we allow users to define which operations can be pushed down to them; but the concept of a push down is really a Pig concept, not a Load concept. I think breaking this out into two interfaces would be more advisable.

          So, I propose instead

          interface Filterable {
          
           /**
           * checks if the plan consists of operations that a loader can push down to the physical level
           */
           public boolean checkFilterPlan(OperatorPlan plan);
          
           /**
           *  sets a filter plan that will be pushed down to this loader.
           *  throws exceptions if the plan is poorly formed (what does this mean? tbd) or
           *  if this loader cannot accept this plan for some reason
           */
           public void setFilterPlan(OperatorPlan plan) throws MalformedPlanException, UnacceptablePlanException;
          
           /**
           *  same as setFilterPlan, but allows multiple filters to be applied in order.
           */
           public void addFilterPlan(OperatorPlan plan) throws MalformedPlanException, UnacceptablePlanException;
          
          }
          
          

          I took out get features as it's implied by implementing one or the other interface (or both).

          Was hoping a logical plan would be ok for communicating the Filter plan. Agreed that passing sql and having users parse it is cludgy. Can't a simple AST be represented as a (very plain) LogicalPlan? Not sure what our alternatives are here.

          The projectionable (or Selectable?) interface below is pretty simplified – it only allows specifying which columns are needed, and does not do things like applying arbitrary UDFs, etc. I think this is better because it is very straightforward to implement. I expect that the common case for projection pushdown is a column store storage engine; in that case all one really needs to know is the indexes of the requested columns to avoid touching irrelevant data. There may be some edge case on-disk structures that allow pushing down projections of nested data, or even some special udfs (materialized views?) but it seems like they are not worth the added complexity of requiring pig users to deal with OperatorPlans. This is not avoidable for Filters, but I think we can make life easier for folks with Projections.

          interface Projectionable {
          
           /**
           *  sets the fields that this loader should return.
           *  @param keys an array of indexes into the Loaders true schema. For example the keys are set to
           *  [2,4,1] then the tuple returned by loader.getNext() will have three fields, the second, fourth, and first
           */
           public void setProjectionKeys(int[] keys) throws IllegalArgumentException;
          
          }
          

          StoreFunc

          setURI – same comment as getURI, Location (as a String) seems better.

          prepareToWrite – does it need a close() / finish() mate?

          StoreMetadata (or, MetadataWriter)

          Where does one specify what MetadataWriter to use? Is it inside the StoreFunc? In that case StoreFunc needs a method to return its implementation of MetadataWriter. Is it global? Then we need to specify how it gets set. Same applies to MetadataReader and LoadFunc.

          Since we may want to insulate the pig user from the metadata business unless he really wants to get into it, it probably makes sense to assume a global default Pig Metadata Repository. In cases when the user writes a loader for data managed by something else (Hive, Hbase), they can specify a metadataReader at the loader. We may want to also allow them to specify the metadataReader at the script level:

          a = load '/dw/hive/foo' using HiveLoader() with metadataReader HiveMetadataReader(some params) as (....)
          

          I don't really have any particularly good ideas on the questions you raise regarding the specifics of how the interface should work.

          InputFormatLoader:

          I think the types table can be extended to support ArrayWritable and MapWritable as long as array members and key/value types are among the
          types listed.

          As far as needing to do something special for loaders like BinStorage and JSONLoader – can't they get an underlying inputformat on the front end the same way the side files are proposed to be handled (new instance of IF, getSplits, RecordReader, read)?

          Regarding using InputFormat et al without Hadoop:

          I can't claim to have a detailed understanding of the underlying issues, but it seems to me that those things are just interfaces and can be divorced from HDFS by creating implementations that deal with the local filesystem directly. Or is the idea to be able to run completely without the Hadoop libraries?

          Whew. That's all for now.

          Thanks for reading the lengthy comments, and looking forward to continued discussion.

          Show
          Dmitriy V. Ryaboy added a comment - The comments below are from both me and Ashutosh. We'd like to preface this with saying that we think overall, the proposed changes are very useful and important, and are likely to result in significantly reducing the barriers to Pig adoption in the broader Hadoop user community. There is a lot of suggestions / critiques below, but that's just because we care On to the notes. Names of interfaces Can you explain why everything has a Load prefix? Seems like this limits the interfaces unnecessarily, and is a bit inconsistent semantically (LoadMetadata does not represent metadata associated with loading – it loads metadata. LoadStatistics does not load statistics; it represents statistics, and is loaded using LoadMetadata). How about: LoadCaster -> PigTypeCaster, LoadPushDown -> Filterable, Projectionable (the latter may need a better name) (clearly, we are also suggesting breaking down the interface into multiple interfaces – more on that later) LoadSchema -> ResourceSchema LoadFieldSchema -> FieldSchema or ResourceFieldSchema LoadMetadata -> MetadataReader StoreMetadata -> MetadataWriter LoadStatistics -> ResourceStatistics LoadFunc In regards to the appropriate parameters for setURI – can you explain the advantage of this over Strings in more detail? I think the current setLocation approach is preferable; it gives users more flexibility. Plus Hadoop Paths are constructed from strings, not URIs, so we are forcing a string->uri->string conversion on the common case. The getLoadCaster method – perhaps getTypeCaster or getPigTypeCaster is a better name? prepareToRead : does it need a finishReading() mate? I would like to see a "standard" method for getting the jobconf (or whatever it is called in 20/21), both for LoadFunc and StoreFunc. LoadCaster (Or PigTypeCaster..) This interface is implemented by UTF8StorageConverter. Let's decide on what these are – casters or converters – and use one term. LoadMetadata (or MetadataLoader) Some thoughts on the problem of what happens when the loader is loading multiple resources or a resource with multiple partitions. We think that the schema should be uniform for everything a single instance of a loader is responsible for loading (and the loader can fill in null or defaults where appropriate if some resources are missing fields). Statistics should be aggregated, since the collection of resources will be treated as one (knowledge of relevant partitions would be used by a Filterable/Projectionable/Pushdownable loader to push selections down, not, I think, by downstream operators). So we have two options. In option 1, getStatistics would return a collection (lower c) of stats associated with the resources that the loader is loading, perhaps as a Map of String->ResourceStatistics. These would need to go through a stat aggregator of some sort that would know how to deal with unifying statistics across multiple resources in a generic way. In option 2, getStatistics would be responsible for its own implementation of aggregation, which would give it flexibility in terms of how such aggregation is done. Since we don't expect many stat stores, this seems preferable to us, as generic aggregation is going to be hard to get right). (of course there is option 3, where we have a default stat aggregator class that can be extended/overridden by individual MetadataLoaders, but I imagine this would be a hard sell). LoadSchema (or ResourceSchema) Should org.apache.pig.impl.logicalLayer.schema.Schema be changed to use this as an internal representation? I like how sort information is handled here. Perhaps we can consider using this approach instead of SortInfo in PIG-953 If PigSchema implements or contains ResourceSchema , SortInfo will no longer be needed. PartitionKeys aren't really part of schema; they are a storage/distribution property. This should go into the Metadata and refer to the schema. LoadStatistics (or ResourceStatistics) Why the public fields? Not that I am a huge fan of getters and setters but I sense findbugs warnings heading our way. We need to account for some statistics being missing. What should numRecords be set to if we don't know the number of records? We can use Long and set it to null; we can use a magic value (-1?); we can wrap in a getter and throw an exception (ugh). I had envisioned statistics as more of a key-value thing, with some keys predefined in a separate class. So we would have: ResourceStats.NUM_RECORDS ResourceStats.SIZE_IN_BYTES //etc and to get the stats we would call MyResourceStats.getLong(ResourceStats.NUM_RECORDS) MyResourceStats.getFloat(ResourceStats.SOMETHING_THAT_IS_A_FLOAT) //etc This allows us to be far more flexible in regards to the things marked as "//probably more here." Something that is probably worth mentioning in its own topic, but we have been considering extending Pig Latin to allow users to specify some dataset properties on the fly. For example when performing a filter, a user can help planning by saying something like bar = FILTER foo BY myUdf($1) EXPECT SELECTIVITY=0.95 -- just getting rid of some noise! foobar = JOIN foo by id, bar by id EXPECT SELECTIVITY=0.1 -- small cross-section! Having the user provide a rough estimate of operator selectivity would be far more effective than trying to estimate result sizes based on stats. In fact no less an authority than Microsoft's Surajit Chaudhury advocates opening up these sorts of communication channels in SQL to help optimizers do a better job (see his SIGMOD 2009 paper). This implies that statistics can (and should) be associated not just with loaded resources, but with operator outputs. LoadPushdown As alluded to above, I am not sure this is a good interface. The idea is that we allow users to define which operations can be pushed down to them; but the concept of a push down is really a Pig concept, not a Load concept. I think breaking this out into two interfaces would be more advisable. So, I propose instead interface Filterable { /** * checks if the plan consists of operations that a loader can push down to the physical level */ public boolean checkFilterPlan(OperatorPlan plan); /** * sets a filter plan that will be pushed down to this loader. * throws exceptions if the plan is poorly formed (what does this mean? tbd) or * if this loader cannot accept this plan for some reason */ public void setFilterPlan(OperatorPlan plan) throws MalformedPlanException, UnacceptablePlanException; /** * same as setFilterPlan, but allows multiple filters to be applied in order. */ public void addFilterPlan(OperatorPlan plan) throws MalformedPlanException, UnacceptablePlanException; } I took out get features as it's implied by implementing one or the other interface (or both). Was hoping a logical plan would be ok for communicating the Filter plan. Agreed that passing sql and having users parse it is cludgy. Can't a simple AST be represented as a (very plain) LogicalPlan? Not sure what our alternatives are here. The projectionable (or Selectable?) interface below is pretty simplified – it only allows specifying which columns are needed, and does not do things like applying arbitrary UDFs, etc. I think this is better because it is very straightforward to implement. I expect that the common case for projection pushdown is a column store storage engine; in that case all one really needs to know is the indexes of the requested columns to avoid touching irrelevant data. There may be some edge case on-disk structures that allow pushing down projections of nested data, or even some special udfs (materialized views?) but it seems like they are not worth the added complexity of requiring pig users to deal with OperatorPlans. This is not avoidable for Filters, but I think we can make life easier for folks with Projections. interface Projectionable { /** * sets the fields that this loader should return . * @param keys an array of indexes into the Loaders true schema. For example the keys are set to * [2,4,1] then the tuple returned by loader.getNext() will have three fields, the second, fourth, and first */ public void setProjectionKeys( int [] keys) throws IllegalArgumentException; } StoreFunc setURI – same comment as getURI, Location (as a String) seems better. prepareToWrite – does it need a close() / finish() mate? StoreMetadata (or, MetadataWriter) Where does one specify what MetadataWriter to use? Is it inside the StoreFunc? In that case StoreFunc needs a method to return its implementation of MetadataWriter. Is it global? Then we need to specify how it gets set. Same applies to MetadataReader and LoadFunc. Since we may want to insulate the pig user from the metadata business unless he really wants to get into it, it probably makes sense to assume a global default Pig Metadata Repository. In cases when the user writes a loader for data managed by something else (Hive, Hbase), they can specify a metadataReader at the loader. We may want to also allow them to specify the metadataReader at the script level: a = load '/dw/hive/foo' using HiveLoader() with metadataReader HiveMetadataReader(some params) as (....) I don't really have any particularly good ideas on the questions you raise regarding the specifics of how the interface should work. InputFormatLoader: I think the types table can be extended to support ArrayWritable and MapWritable as long as array members and key/value types are among the types listed. As far as needing to do something special for loaders like BinStorage and JSONLoader – can't they get an underlying inputformat on the front end the same way the side files are proposed to be handled (new instance of IF, getSplits, RecordReader, read)? Regarding using InputFormat et al without Hadoop: I can't claim to have a detailed understanding of the underlying issues, but it seems to me that those things are just interfaces and can be divorced from HDFS by creating implementations that deal with the local filesystem directly. Or is the idea to be able to run completely without the Hadoop libraries? Whew. That's all for now. Thanks for reading the lengthy comments, and looking forward to continued discussion.
          Hide
          Alan Gates added a comment -

          Responses to Dmitry's and Ashutosh's comments:

          Can you explain why everything has a Load prefix? Seems like this limits the interfaces unnecessarily, and is a bit inconsistent semantically (LoadMetadata does not represent metadata associated with loading - it loads metadata. LoadStatistics does not load statistics; it represents statistics, and is loaded using LoadMetadata).

          I don't claim to be a naming guru, so I'm open to other naming suggestions. I chose to prefix all of the interfaces with Load or Store to show that they were related to Load and Store. For example, by calling it LoadMetadata I
          did intend to show explicitly that this is metadata associated with loading. I agree that naming schemas and statistics something other than Load is good, because they aren't used solely for loading.

          In regards to the appropriate parameters for setURI - can you explain the advantage of this over Strings in more detail? I think the current setLocation approach is preferable; it gives users more flexibility. Plus Hadoop Paths are constructed from strings, not URIs, so we are forcing a string->uri->string conversion on the common case.

          The real concern I have here is I want Pig to be able to distinguish when users intend to refer to a filename and when they don't. This is important because Pig sometimes munges file names. Consider the following Pig
          Latin script:

          cd '/user/gates';
          A = load './bla';
          ...
          Z = limit Y 10;
          cd '/tmp';
          dump Z;
          

          By the time Pig evaluates Z for dumping, ./bla will have a different meaning than it did when the user typed it. Pig understands that and transforms the load statement to load '/user/gates/bla'. But it needs to know not
          to mess with statements like:

          A = load 'http://...';
          

          By explicitly making the location a URI we encourage users and load function writers to think this way. Your argument that Hadoop paths are by default strings is persuasive. Perhaps its best to leave this as strings but look
          for a scheme at the beginning and interpret it as a URI if it has one (which is what Pig does now).

          prepareToRead: does it need a finishReading() mate?

          A good idea. Same for finishWriting() below.

          I would like to see a "standard" method for getting the jobconf (or whatever it is called in 20/21), both for LoadFunc and StoreFunc.

          I agree, but I didn't take that on here. We need a standard way to move configuration information (Hadoop and Pig) into Load, Store, and Eval Funcs. But I viewed that as a separate issue that should be solved for all UDFs.

          We think that the schema should be uniform for everything a single instance of a loader is responsible for loading (and the loader can fill in null or defaults where appropriate if some resources are missing fields).

          Agreed, that is what I was trying to say. Perhaps it wasn't clear.

          Should org.apache.pig.impl.logicalLayer.schema.Schema be changed to use this as an internal representation?

          No. It serves a different purpose, which is to define the content of data flows inside the logical plan. We should not tie these two together.

          PartitionKeys aren't really part of schema; they are a storage/distribution property. This should go into the Metadata and refer to the schema.

          We need partition keys as part of this interface, as Pig will need to be able to pass partition keys to loaders that are capable of doing partition pruning. So we could add getPartitionKeys to the LoadMetadata interface.

          Why the public fields? Not that I am a huge fan of getters and setters but I sense findbugs warnings heading our way.

          LoadSchema and LoadStatistics as proposed are structs. I don't see any reason to pretend otherwise. And I'm not inclined to bend my programming style to match that of whoever wrote findbugs.

          I had envisioned statistics as more of a key-value thing, with some keys predefined in a separate class. So we would have:

          ResourceStats.NUM_RECORDS
          ResourceStats.SIZE_IN_BYTES
          //etc

          and to get the stats we would call

          MyResourceStats.getLong(ResourceStats.NUM_RECORDS)
          MyResourceStats.getFloat(ResourceStats.SOMETHING_THAT_IS_A_FLOAT)
          //etc

          This allows us to be far more flexible in regards to the things marked as "//probably more here."

          The problem with key/value set ups like this is it can be hard for people to understand what is already there. So they end up not using what already exists, or worse, re-inventing the wheel. My hope is that by
          versioning this we can get around the need for this key/value stuff.

          As alluded to above, I am not sure this is a good interface. The idea is that we allow users to define which operations can be pushed down to them; but the concept of a push down is really a Pig concept, not a Load concept. I think breaking this out into two interfaces would be more advisable.

          So what happens tomorrow when some loaders can do merge joins on sorted data? Now we have to have another interface. I want this to be easily extensible.

          Where does one specify what MetadataWriter to use? Is it inside the StoreFunc? In that case StoreFunc needs a method to return its implementation of MetadataWriter. Is it global? Then we need to specify how it gets set. Same applies to MetadataReader and LoadFunc.

          I'm assuming that a given StoreFunc is tied to a particular metadata instance, so it would return its implementation of StoreMetadata. This, and the related proposal for a metadata interface (see PIG-967) seek to insulate Pig from the metadata system. But I am not assuming that the
          loader and store functions themselves will be insulated. Those, I'm asserting, will be metadata system specific. I don't see how we can avoid it, as they'll need to do schema and statistics translations, possibly data type
          translations, etc.

          For thoughts on having a default metadata repository, see PIG-967 and the associated wiki page, which discusses that.

          I think the types table can be extended to support ArrayWritable and MapWritable as long as array members and key/value types are among the types listed.

          Probably, I'll take a look.

          As far as needing to do something special for loaders like BinStorage and JSONLoader - can't they get an underlying inputformat on the front end the same way the side files are proposed to be handled (new instance of IF, getSplits, RecordReader, read)?

          Probably.

          I can't claim to have a detailed understanding of the underlying issues, but it seems to me that those things are just interfaces and can be divorced from HDFS by creating implementations that deal with the local filesystem directly. Or is the idea to be able to run completely without the Hadoop libraries?

          I'm not proposing that Pig is able to run completely without Hadoop libraries. And I'm guessing that we can use the HDFS implementations on the local file system. But I don't know it for certain. That's a loose end we need to
          tie up before we declare this to be the plan.

          Show
          Alan Gates added a comment - Responses to Dmitry's and Ashutosh's comments: Can you explain why everything has a Load prefix? Seems like this limits the interfaces unnecessarily, and is a bit inconsistent semantically (LoadMetadata does not represent metadata associated with loading - it loads metadata. LoadStatistics does not load statistics; it represents statistics, and is loaded using LoadMetadata). I don't claim to be a naming guru, so I'm open to other naming suggestions. I chose to prefix all of the interfaces with Load or Store to show that they were related to Load and Store. For example, by calling it LoadMetadata I did intend to show explicitly that this is metadata associated with loading. I agree that naming schemas and statistics something other than Load is good, because they aren't used solely for loading. In regards to the appropriate parameters for setURI - can you explain the advantage of this over Strings in more detail? I think the current setLocation approach is preferable; it gives users more flexibility. Plus Hadoop Paths are constructed from strings, not URIs, so we are forcing a string->uri->string conversion on the common case. The real concern I have here is I want Pig to be able to distinguish when users intend to refer to a filename and when they don't. This is important because Pig sometimes munges file names. Consider the following Pig Latin script: cd '/user/gates'; A = load './bla'; ... Z = limit Y 10; cd '/tmp'; dump Z; By the time Pig evaluates Z for dumping, ./bla will have a different meaning than it did when the user typed it. Pig understands that and transforms the load statement to load '/user/gates/bla'. But it needs to know not to mess with statements like: A = load 'http: //...'; By explicitly making the location a URI we encourage users and load function writers to think this way. Your argument that Hadoop paths are by default strings is persuasive. Perhaps its best to leave this as strings but look for a scheme at the beginning and interpret it as a URI if it has one (which is what Pig does now). prepareToRead: does it need a finishReading() mate? A good idea. Same for finishWriting() below. I would like to see a "standard" method for getting the jobconf (or whatever it is called in 20/21), both for LoadFunc and StoreFunc. I agree, but I didn't take that on here. We need a standard way to move configuration information (Hadoop and Pig) into Load, Store, and Eval Funcs. But I viewed that as a separate issue that should be solved for all UDFs. We think that the schema should be uniform for everything a single instance of a loader is responsible for loading (and the loader can fill in null or defaults where appropriate if some resources are missing fields). Agreed, that is what I was trying to say. Perhaps it wasn't clear. Should org.apache.pig.impl.logicalLayer.schema.Schema be changed to use this as an internal representation? No. It serves a different purpose, which is to define the content of data flows inside the logical plan. We should not tie these two together. PartitionKeys aren't really part of schema; they are a storage/distribution property. This should go into the Metadata and refer to the schema. We need partition keys as part of this interface, as Pig will need to be able to pass partition keys to loaders that are capable of doing partition pruning. So we could add getPartitionKeys to the LoadMetadata interface. Why the public fields? Not that I am a huge fan of getters and setters but I sense findbugs warnings heading our way. LoadSchema and LoadStatistics as proposed are structs. I don't see any reason to pretend otherwise. And I'm not inclined to bend my programming style to match that of whoever wrote findbugs. I had envisioned statistics as more of a key-value thing, with some keys predefined in a separate class. So we would have: ResourceStats.NUM_RECORDS ResourceStats.SIZE_IN_BYTES //etc and to get the stats we would call MyResourceStats.getLong(ResourceStats.NUM_RECORDS) MyResourceStats.getFloat(ResourceStats.SOMETHING_THAT_IS_A_FLOAT) //etc This allows us to be far more flexible in regards to the things marked as "//probably more here." The problem with key/value set ups like this is it can be hard for people to understand what is already there. So they end up not using what already exists, or worse, re-inventing the wheel. My hope is that by versioning this we can get around the need for this key/value stuff. As alluded to above, I am not sure this is a good interface. The idea is that we allow users to define which operations can be pushed down to them; but the concept of a push down is really a Pig concept, not a Load concept. I think breaking this out into two interfaces would be more advisable. So what happens tomorrow when some loaders can do merge joins on sorted data? Now we have to have another interface. I want this to be easily extensible. Where does one specify what MetadataWriter to use? Is it inside the StoreFunc? In that case StoreFunc needs a method to return its implementation of MetadataWriter. Is it global? Then we need to specify how it gets set. Same applies to MetadataReader and LoadFunc. I'm assuming that a given StoreFunc is tied to a particular metadata instance, so it would return its implementation of StoreMetadata. This, and the related proposal for a metadata interface (see PIG-967 ) seek to insulate Pig from the metadata system. But I am not assuming that the loader and store functions themselves will be insulated. Those, I'm asserting, will be metadata system specific. I don't see how we can avoid it, as they'll need to do schema and statistics translations, possibly data type translations, etc. For thoughts on having a default metadata repository, see PIG-967 and the associated wiki page, which discusses that. I think the types table can be extended to support ArrayWritable and MapWritable as long as array members and key/value types are among the types listed. Probably, I'll take a look. As far as needing to do something special for loaders like BinStorage and JSONLoader - can't they get an underlying inputformat on the front end the same way the side files are proposed to be handled (new instance of IF, getSplits, RecordReader, read)? Probably. I can't claim to have a detailed understanding of the underlying issues, but it seems to me that those things are just interfaces and can be divorced from HDFS by creating implementations that deal with the local filesystem directly. Or is the idea to be able to run completely without the Hadoop libraries? I'm not proposing that Pig is able to run completely without Hadoop libraries. And I'm guessing that we can use the HDFS implementations on the local file system. But I don't know it for certain. That's a loose end we need to tie up before we declare this to be the plan.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Hi Alan,
          Responses to responses:

          Perhaps its best to leave this as strings but look for a scheme at the beginning and interpret it as a URI if it has one (which is what Pig does now).

          I understand the motivation more clearly now, thanks for the explanation. Agreed with the quoted approach.

          [regarding single schema for partitioned datasets] Agreed, that is what I was trying to say. Perhaps it wasn't clear.

          Nope, it was clear, I just have a very verbose way of saying "yes".

          Regarding merging the Schemas you said:

          No. It serves a different purpose, which is to define the content of data flows inside the logical plan. We should not tie these two together.

          I don't really understand the difference, but accept your superior knowledge of the codebase and accept your decision .

          I'm not inclined to bend my programming style to match that of whoever wrote findbugs.

          +9.3 from the Russian judge. Gleefully accepted.

          We need partition keys as part of this interface, as Pig will need to be able to pass partition keys to loaders that are capable of doing partition pruning. So we could add getPartitionKeys to the LoadMetadata interface.

          That's precisely what I am suggesting – take it out of Schema, put it in LoadMetadata (or MetadataReader, as I like to call it....).

          The problem with key/value set ups like this is it can be hard for people to understand what is already there. So they end up not using what already exists, or worse, re-inventing the wheel. My hope is that by versioning this we can get around the need for this key/value stuff.

          Hm, I see your point. I am interested in being able to augment the set of available statistics without requiring changes to the base classes, however. I guess that's where inheritance comes in handy. Any comments on how to handle missing data? Primitive types still don't work for that.

          So what happens tomorrow when some loaders can do merge joins on sorted data? Now we have to have another interface. I want this to be easily extensible.

          I must not be clear on what pushing down to a loader does. My interpretation was that it allows pushing down operations to the point where you don't read unnecessary data off disk. A classic example of filter projection would be filtering by a partition key (so, dt >sysdate-30 , and our data is stored in files one per day). An example of projection pushdown is when we have a column store that simply avoids loading some of the columns.

          I don't see how a loader can push down a join. That seems to require reading and changing data. Is the idea that such a join can be performed without an MR step? That seems like a Pig thing, not a loader thing.

          In any case, yes, I think something like this would require a new interface in the same namespace, since it's a drastically different capability.

          Any thoughts on advisability of simplifying projection pushdown to just work on an int array? I know it's limiting, but it's going to be a heck of a lot easier for users to implement.

          I'm assuming that a given StoreFunc is tied to a particular metadata instance, so it would return its implementation of StoreMetadata.

          I was assuming that Pig would have a preferred metadata store (such as Owl), and it would attempt to use it unless instructed otherwise. We could even try some cascading thing: if the user specifies a metadata store on the command line, use that; if not, see whether the loader suggests one; if not, use Owl; if owl doesn't have anything, see if it's an file in a known scheme (hdfs, file, s3n...) and at least get some file-level metadata such as create date and size. StoreMetadata can do the same (except for hdfs part).

          I'll take another look at PIG-967.

          Show
          Dmitriy V. Ryaboy added a comment - Hi Alan, Responses to responses: Perhaps its best to leave this as strings but look for a scheme at the beginning and interpret it as a URI if it has one (which is what Pig does now). I understand the motivation more clearly now, thanks for the explanation. Agreed with the quoted approach. [regarding single schema for partitioned datasets] Agreed, that is what I was trying to say. Perhaps it wasn't clear. Nope, it was clear, I just have a very verbose way of saying "yes". Regarding merging the Schemas you said: No. It serves a different purpose, which is to define the content of data flows inside the logical plan. We should not tie these two together. I don't really understand the difference, but accept your superior knowledge of the codebase and accept your decision . I'm not inclined to bend my programming style to match that of whoever wrote findbugs. +9.3 from the Russian judge. Gleefully accepted. We need partition keys as part of this interface, as Pig will need to be able to pass partition keys to loaders that are capable of doing partition pruning. So we could add getPartitionKeys to the LoadMetadata interface. That's precisely what I am suggesting – take it out of Schema, put it in LoadMetadata (or MetadataReader, as I like to call it....). The problem with key/value set ups like this is it can be hard for people to understand what is already there. So they end up not using what already exists, or worse, re-inventing the wheel. My hope is that by versioning this we can get around the need for this key/value stuff. Hm, I see your point. I am interested in being able to augment the set of available statistics without requiring changes to the base classes, however. I guess that's where inheritance comes in handy. Any comments on how to handle missing data? Primitive types still don't work for that. So what happens tomorrow when some loaders can do merge joins on sorted data? Now we have to have another interface. I want this to be easily extensible. I must not be clear on what pushing down to a loader does. My interpretation was that it allows pushing down operations to the point where you don't read unnecessary data off disk. A classic example of filter projection would be filtering by a partition key (so, dt >sysdate-30 , and our data is stored in files one per day). An example of projection pushdown is when we have a column store that simply avoids loading some of the columns. I don't see how a loader can push down a join. That seems to require reading and changing data. Is the idea that such a join can be performed without an MR step? That seems like a Pig thing, not a loader thing. In any case, yes, I think something like this would require a new interface in the same namespace, since it's a drastically different capability. Any thoughts on advisability of simplifying projection pushdown to just work on an int array? I know it's limiting, but it's going to be a heck of a lot easier for users to implement. I'm assuming that a given StoreFunc is tied to a particular metadata instance, so it would return its implementation of StoreMetadata. I was assuming that Pig would have a preferred metadata store (such as Owl), and it would attempt to use it unless instructed otherwise. We could even try some cascading thing: if the user specifies a metadata store on the command line, use that; if not, see whether the loader suggests one; if not, use Owl; if owl doesn't have anything, see if it's an file in a known scheme (hdfs, file, s3n...) and at least get some file-level metadata such as create date and size. StoreMetadata can do the same (except for hdfs part). I'll take another look at PIG-967 .
          Hide
          Alan Gates added a comment -

          I must not be clear on what pushing down to a loader does. My interpretation was that it allows pushing down operations to the point where you don't read unnecessary data off disk. A classic example of filter projection would be filtering by a partition key (so, dt >sysdate-30 , and our data is stored in files one per day). An example of projection pushdown is when we have a column store that simply avoids loading some of the columns.

          I don't see how a loader can push down a join. That seems to require reading and changing data. Is the idea that such a join can be performed without an MR step? That seems like a Pig thing, not a loader thing.

          In any case, yes, I think something like this would require a new interface in the same namespace, since it's a drastically different capability.

          Any thoughts on advisability of simplifying projection pushdown to just work on an int array? I know it's limiting, but it's going to be a heck of a lot easier for users to implement.

          Limiting the data you need to read off disk is partition pruning, or in the case of columnar stores, column pruning. But this isn't the only case in which you might want to push down operators. Consider
          data that has (name, age, address) and is partitioned on name. A user might want to query only over adults (age > 17). This isn't a partition field. But if it's a columnar store and age is compressed in
          say run length or offset encoding the load function may be able to apply the filter on the compressed data. This can be a huge win, as we avoid decompressing whole rows that we don't need. To see another
          case where we might want to push operators to the loader, consider the case where a user is loading a set of Zebra files, all of which are sorted on one key. Pig may want to keep those zebra files
          sorted. It will need a way to tell the loader to merge those files as it loads them rather than concatenate them and force Pig to resort the input.

          I understand your concern on making it difficult to pass down just projection. And you are not the only one to express this concern. Though even there for full projections, we need more than a simple int array, so that we can
          handle things like map, bag, etc. projections. But maybe we need a simpler option for users who just want to push projection and then the full blown option for power users who want to push selection, etc.
          Beginner and advanced interfaces I guess.

          Show
          Alan Gates added a comment - I must not be clear on what pushing down to a loader does. My interpretation was that it allows pushing down operations to the point where you don't read unnecessary data off disk. A classic example of filter projection would be filtering by a partition key (so, dt >sysdate-30 , and our data is stored in files one per day). An example of projection pushdown is when we have a column store that simply avoids loading some of the columns. I don't see how a loader can push down a join. That seems to require reading and changing data. Is the idea that such a join can be performed without an MR step? That seems like a Pig thing, not a loader thing. In any case, yes, I think something like this would require a new interface in the same namespace, since it's a drastically different capability. Any thoughts on advisability of simplifying projection pushdown to just work on an int array? I know it's limiting, but it's going to be a heck of a lot easier for users to implement. Limiting the data you need to read off disk is partition pruning, or in the case of columnar stores, column pruning. But this isn't the only case in which you might want to push down operators. Consider data that has (name, age, address) and is partitioned on name. A user might want to query only over adults (age > 17). This isn't a partition field. But if it's a columnar store and age is compressed in say run length or offset encoding the load function may be able to apply the filter on the compressed data. This can be a huge win, as we avoid decompressing whole rows that we don't need. To see another case where we might want to push operators to the loader, consider the case where a user is loading a set of Zebra files, all of which are sorted on one key. Pig may want to keep those zebra files sorted. It will need a way to tell the loader to merge those files as it loads them rather than concatenate them and force Pig to resort the input. I understand your concern on making it difficult to pass down just projection. And you are not the only one to express this concern. Though even there for full projections, we need more than a simple int array, so that we can handle things like map, bag, etc. projections. But maybe we need a simpler option for users who just want to push projection and then the full blown option for power users who want to push selection, etc. Beginner and advanced interfaces I guess.
          Hide
          Alan Gates added a comment -

          In thinking about it more, it becomes obvious that we have to separate out determining the partition keys for an input from getting the schema, as Dmitry and Ashutosh suggested above. The reason is that Pig cannot ask the loader for a schema until it has completely defined what will be loaded (because the schema will depend on what is being loaded). And to completely define what is being loaded it needs to determine the partition keys and possibly specify a filter condition for them. So we need to add a getPartitionKeys and setPartitionFilter to the LoadMetadata interface.

          Show
          Alan Gates added a comment - In thinking about it more, it becomes obvious that we have to separate out determining the partition keys for an input from getting the schema, as Dmitry and Ashutosh suggested above. The reason is that Pig cannot ask the loader for a schema until it has completely defined what will be loaded (because the schema will depend on what is being loaded). And to completely define what is being loaded it needs to determine the partition keys and possibly specify a filter condition for them. So we need to add a getPartitionKeys and setPartitionFilter to the LoadMetadata interface.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Alan, thanks for the explanation on the kinds of pushdowns you are envisioning. This makes sense, although I have a feeling that if we get this complex with pushdowns, it may be more appropriate to start thinking of interfaces that expose different access paths, rather than "pushdownable operations".

          Starting to think perhaps you are right in wanting to make this a single interface instead of multiple ones like I suggested.

          A couple more thoughts on the LoadPushdown interface.

          getFeatures() should probably return a Set, not a List, as duplicates don't really make sense and we want fast contains() calls on the returned object.

          The new idea is just a small tweak on your design that aims to avoid the OperatorPlan issue.

          Maintain a Set of LogicalOperator classes (as in, LOProject.class) to indicate acceptable operators, and provide an pushOperator(LogicalOperator op) method, which can be called multiple times. If the order of operators matters, it should be up to whoever is calling this method to do so in the right order.

          This does force LoadFunc implementations to understand Pig operator classes, and in the case of Filter it does have to deal with an inner LogicalPlan, but I think those classes are mostly ok. If someone is advanced enough to want to implement pushdowns, they can handle those interfaces. There is the danger of the interfaces changing, of course, but, well, that consideration hasn't stopped Hadoop... and we are setting a precedent by breaking the LoadFunc interface right now anyway .

          Too simple?

          Show
          Dmitriy V. Ryaboy added a comment - Alan, thanks for the explanation on the kinds of pushdowns you are envisioning. This makes sense, although I have a feeling that if we get this complex with pushdowns, it may be more appropriate to start thinking of interfaces that expose different access paths, rather than "pushdownable operations". Starting to think perhaps you are right in wanting to make this a single interface instead of multiple ones like I suggested. A couple more thoughts on the LoadPushdown interface. getFeatures() should probably return a Set, not a List, as duplicates don't really make sense and we want fast contains() calls on the returned object. The new idea is just a small tweak on your design that aims to avoid the OperatorPlan issue. Maintain a Set of LogicalOperator classes (as in, LOProject.class) to indicate acceptable operators, and provide an pushOperator(LogicalOperator op) method, which can be called multiple times. If the order of operators matters, it should be up to whoever is calling this method to do so in the right order. This does force LoadFunc implementations to understand Pig operator classes, and in the case of Filter it does have to deal with an inner LogicalPlan, but I think those classes are mostly ok. If someone is advanced enough to want to implement pushdowns, they can handle those interfaces. There is the danger of the interfaces changing, of course, but, well, that consideration hasn't stopped Hadoop... and we are setting a precedent by breaking the LoadFunc interface right now anyway . Too simple?
          Hide
          Dmitriy V. Ryaboy added a comment -

          Regarding historgram representation:

          I took a look at how Postgres does it, and they simply store 3 arrays:

          • An array of "Most Common Values", which contains exactly what it sounds like, ordered in decreasing frequency
          • A matching array of frequencies, expressed as a fraction of the total row count in the relation.
          • an array of sorted values chosen in such a way that the number of rows with values between A[i] and A[i+1] is roughly the same for all i. An interesting optimization they perform is that if the most common values array described above is defined for this field, then the values in that array are not included when calculating the boundaries for the histogram. They say that's called a "compressed histogram", if someone wants to dig up some papers on this.

          Any objections to this design?

          Show
          Dmitriy V. Ryaboy added a comment - Regarding historgram representation: I took a look at how Postgres does it, and they simply store 3 arrays: An array of "Most Common Values", which contains exactly what it sounds like, ordered in decreasing frequency A matching array of frequencies, expressed as a fraction of the total row count in the relation. an array of sorted values chosen in such a way that the number of rows with values between A [i] and A [i+1] is roughly the same for all i. An interesting optimization they perform is that if the most common values array described above is defined for this field, then the values in that array are not included when calculating the boundaries for the histogram. They say that's called a "compressed histogram", if someone wants to dig up some papers on this. Any objections to this design?
          Hide
          Dmitriy V. Ryaboy added a comment -

          Couple more iterms..

          ResourceFieldSchema uses DataType for types; DataType is a collection of static methods and lookups, and cannot define a type. Instead, currently byte values are currently used to represent types, and set of byte constants for the various types lives in DataType. So ResourceFieldSchema should have types be bytes.

          Also, as I am working through a toy implementation, I find that it's very useful to have both ResourceSchema and ResourceStatistics declare getters and setters – this lets various serialization libraries serialize/deserialize them automatically.

          Show
          Dmitriy V. Ryaboy added a comment - Couple more iterms.. ResourceFieldSchema uses DataType for types; DataType is a collection of static methods and lookups, and cannot define a type. Instead, currently byte values are currently used to represent types, and set of byte constants for the various types lives in DataType. So ResourceFieldSchema should have types be bytes. Also, as I am working through a toy implementation, I find that it's very useful to have both ResourceSchema and ResourceStatistics declare getters and setters – this lets various serialization libraries serialize/deserialize them automatically.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Just wanted to note here that PIG-760 has a patch that includes MetadataWriter, MetadataLoader, ResourceSchema, and ResourceStatistics, all with some deviations from the proposal due to problems that came up while trying to use the interfaces.

          Show
          Dmitriy V. Ryaboy added a comment - Just wanted to note here that PIG-760 has a patch that includes MetadataWriter, MetadataLoader, ResourceSchema, and ResourceStatistics, all with some deviations from the proposal due to problems that came up while trying to use the interfaces.
          Hide
          Alan Gates added a comment -

          A new branch in svn, load-store-redesign, has been created for work related to this JIRA. This work is being done on a branch since it will be quite disruptive for existing load and store functions.

          Show
          Alan Gates added a comment - A new branch in svn, load-store-redesign, has been created for work related to this JIRA. This work is being done on a branch since it will be quite disruptive for existing load and store functions.
          Hide
          Pradeep Kamath added a comment -

          I have updated http://wiki.apache.org/pig/LoadStoreRedesignProposal with some changes in the interfaces and recorded the reasons in the "Changes" section at the bottom of the page. I have also cleaned up the topic a bit and added few new sections giving details of the implementation so far in the branch. I have also added a list of remaining task items. Please review and provide comments - also it would be good to keep this topic up-to-date with changes discussed here and implemented on the branch.

          Show
          Pradeep Kamath added a comment - I have updated http://wiki.apache.org/pig/LoadStoreRedesignProposal with some changes in the interfaces and recorded the reasons in the "Changes" section at the bottom of the page. I have also cleaned up the topic a bit and added few new sections giving details of the implementation so far in the branch. I have also added a list of remaining task items. Please review and provide comments - also it would be good to keep this topic up-to-date with changes discussed here and implemented on the branch.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Regarding statistics – what exactly is mBytes expected to represent? There are two possibilities: size on disk (easy to implement, kind of useless) and size in memory (quite useful, but hard to compute, especially for non-core Pig developers).

          Show
          Dmitriy V. Ryaboy added a comment - Regarding statistics – what exactly is mBytes expected to represent? There are two possibilities: size on disk (easy to implement, kind of useless) and size in memory (quite useful, but hard to compute, especially for non-core Pig developers).
          Hide
          Alan Gates added a comment -

          Size on disk. It's not quite useless, as it can be used to estimate number of splits, etc. It should also be possible to estimate size in memory based on size in disk by applying an average explosion factor (about 4x at the moment I believe).

          Show
          Alan Gates added a comment - Size on disk. It's not quite useless, as it can be used to estimate number of splits, etc. It should also be possible to estimate size in memory based on size in disk by applying an average explosion factor (about 4x at the moment I believe).
          Hide
          Dmitriy V. Ryaboy added a comment -

          I agree, "useless" was a strong word (in fact I've been assuming it means size on disk and using it to estimate number of splits in my cbo code already...). The explosion factor is very iffy when we are dealing with compressed data. But, ok, let's not overthink it – we can save the memory question for the next iteration. I'll edit the wiki to note the mBytes is size on disk.

          Show
          Dmitriy V. Ryaboy added a comment - I agree, "useless" was a strong word (in fact I've been assuming it means size on disk and using it to estimate number of splits in my cbo code already...). The explosion factor is very iffy when we are dealing with compressed data. But, ok, let's not overthink it – we can save the memory question for the next iteration. I'll edit the wiki to note the mBytes is size on disk.
          Hide
          Richard Ding added a comment -

          Since two methods on LoadFunc interface have default implementation (massageFilename for relativeToAbsolutePath, Utf8StorageConverter for getLoadCaster) that many implementers of LoadFunc can use, we propose to change LoadFunc to an abstract class. Use of abstract class also allows us to make backward-compatible changes to the API in later releases. One drawback is that the corresponding StoreFunc can't be an abstract class, since PigStorage now implements both interfaces and Java doesn't allow multiple inheritance. But we think it worthwhile to make the change.

          Another proposal is to make both LoadFunc and StoreFunc generic types. Both are using Hadoop's generic types (e.g. InputFormat, OutputFormat, RecordReader, RecordWriter) in their methods . Change to generic types will provide compile-time type checking, reduce Javac warnings, and force the implementers to specify the input/output types.

          Any comments?

          Show
          Richard Ding added a comment - Since two methods on LoadFunc interface have default implementation (massageFilename for relativeToAbsolutePath, Utf8StorageConverter for getLoadCaster) that many implementers of LoadFunc can use, we propose to change LoadFunc to an abstract class. Use of abstract class also allows us to make backward-compatible changes to the API in later releases. One drawback is that the corresponding StoreFunc can't be an abstract class, since PigStorage now implements both interfaces and Java doesn't allow multiple inheritance. But we think it worthwhile to make the change. Another proposal is to make both LoadFunc and StoreFunc generic types. Both are using Hadoop's generic types (e.g. InputFormat, OutputFormat, RecordReader, RecordWriter) in their methods . Change to generic types will provide compile-time type checking, reduce Javac warnings, and force the implementers to specify the input/output types. Any comments?
          Hide
          Pradeep Kamath added a comment -

          I wanted to check on other's thoughts on having getSchema() method in LoadMetadata Vs LoadFunc

          Pros:

          • getSchema() is inherently a metadata query and hence LoadMetadata seems the right category for it.

          Cons:

          • LoadMetadata has other methods (getStatistics, getPartitonKeys, setPartitionFilter) which are more relevant for SQL like data stores. Loaders on self-describing data like BinStorage and say a JSON Loader would want to implement getSchema() since they can provide a schema but would need to return nulls for these other methods. I am wondering if this would make the users think whether they are implementing a wrong interface since they are returning nulls for most methods or implementing the one method incorrectly i.e. whether LoadMetadata is meant to be implemented only for SQL kind of data

          If we feel the con outweighs the pro should we move getSchema into LoadFunc? - thoughts?

          Show
          Pradeep Kamath added a comment - I wanted to check on other's thoughts on having getSchema() method in LoadMetadata Vs LoadFunc Pros: getSchema() is inherently a metadata query and hence LoadMetadata seems the right category for it. Cons: LoadMetadata has other methods (getStatistics, getPartitonKeys, setPartitionFilter) which are more relevant for SQL like data stores. Loaders on self-describing data like BinStorage and say a JSON Loader would want to implement getSchema() since they can provide a schema but would need to return nulls for these other methods. I am wondering if this would make the users think whether they are implementing a wrong interface since they are returning nulls for most methods or implementing the one method incorrectly i.e. whether LoadMetadata is meant to be implemented only for SQL kind of data If we feel the con outweighs the pro should we move getSchema into LoadFunc? - thoughts?
          Hide
          Alan Gates added a comment -

          Updated http://wiki.apache.org/pig/LoadStoreRedesignProposal with section on how these changes will affect streaming.

          Show
          Alan Gates added a comment - Updated http://wiki.apache.org/pig/LoadStoreRedesignProposal with section on how these changes will affect streaming.
          Hide
          Dmitriy V. Ryaboy added a comment -

          LoadFunc has a method called determineSchema, not getSchema. This implies some sort of introspection, so I can see interpreting this as "if you are looking at the data, use determineSchema, and if you have a metadata store/repo then implement LoadMetadata".

          But I agree this is clunky and potentially confusing.

          I am of two minds about this. On one hand, moving the method make sense as it's metadata-related. On the other hand, it makes implementations that work with self-describing formats like Avro implement a heavy-looking interface, and requires further changes to existing LoadFunc implementations that will have to be ported.

          Another issue is that LoadMetadata.getSchema() returns a ResourceSchema, whereas LoadFunc.determineSchema() returns Pig's Schema. The two are compatible (I have a translation from one to the other in PIG-760), but not the same.

          Show
          Dmitriy V. Ryaboy added a comment - LoadFunc has a method called determineSchema, not getSchema. This implies some sort of introspection, so I can see interpreting this as "if you are looking at the data, use determineSchema, and if you have a metadata store/repo then implement LoadMetadata". But I agree this is clunky and potentially confusing. I am of two minds about this. On one hand, moving the method make sense as it's metadata-related. On the other hand, it makes implementations that work with self-describing formats like Avro implement a heavy-looking interface, and requires further changes to existing LoadFunc implementations that will have to be ported. Another issue is that LoadMetadata.getSchema() returns a ResourceSchema, whereas LoadFunc.determineSchema() returns Pig's Schema. The two are compatible (I have a translation from one to the other in PIG-760 ), but not the same.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Regarding Streaming:

          We should support "Typed Bytes" as a binary protocol for streaming. This was a huge performance win for Dumbo (and I think Hive, as well).

          Here's a 7-slide intro: http://static.last.fm/johan/huguk-20090414/klaas-hadoop-1722.pdf

          Patch/discussion here: https://issues.apache.org/jira/browse/HADOOP-1722

          Show
          Dmitriy V. Ryaboy added a comment - Regarding Streaming: We should support "Typed Bytes" as a binary protocol for streaming. This was a huge performance win for Dumbo (and I think Hive, as well). Here's a 7-slide intro: http://static.last.fm/johan/huguk-20090414/klaas-hadoop-1722.pdf Patch/discussion here: https://issues.apache.org/jira/browse/HADOOP-1722
          Hide
          Alan Gates added a comment -

          I've updated the wiki page to take into account Dmitry's suggestion on using Typed Bytes. Olga also pointed out that we are getting rid of some optimizations in streaming with these changes, and I've added a section on that as well.

          Show
          Alan Gates added a comment - I've updated the wiki page to take into account Dmitry's suggestion on using Typed Bytes. Olga also pointed out that we are getting rid of some optimizations in streaming with these changes, and I've added a section on that as well.
          Hide
          Pradeep Kamath added a comment -

          In reference to my previous comment:

          I wanted to check on other's thoughts on having getSchema() method in LoadMetadata Vs LoadFunc

          I have left the getSchema() method in LoadMetadata as I did not hear any votes to move it to LoadFunc - have changed BinStorage accordingly in the incremental patch on PIG-1090.

          Show
          Pradeep Kamath added a comment - In reference to my previous comment: I wanted to check on other's thoughts on having getSchema() method in LoadMetadata Vs LoadFunc I have left the getSchema() method in LoadMetadata as I did not hear any votes to move it to LoadFunc - have changed BinStorage accordingly in the incremental patch on PIG-1090 .
          Hide
          Dmitriy V. Ryaboy added a comment -

          Quick question:
          I don't remember if we've gone over this before – why is the sortedness information considered part of the schema? Shouldn't it be part of the statistics?

          Show
          Dmitriy V. Ryaboy added a comment - Quick question: I don't remember if we've gone over this before – why is the sortedness information considered part of the schema? Shouldn't it be part of the statistics?
          Hide
          Alan Gates added a comment -

          You can make an argument for putting it in either place. I argue for putting it in for a couple of reasons:

          It is useful to a large number of potential optimizations.

          Unlike most other statistics, it can be used in correctness checks (eg the user asked for a merge join, is the data sorted on the join key?)

          The only downside I can see is that some systems that will understand column names and types won't necessarily understand sortedness (like json). But it's no harder for the loader to figure out sortedness for the schema than it is for the statistics.

          Show
          Alan Gates added a comment - You can make an argument for putting it in either place. I argue for putting it in for a couple of reasons: It is useful to a large number of potential optimizations. Unlike most other statistics, it can be used in correctness checks (eg the user asked for a merge join, is the data sorted on the join key?) The only downside I can see is that some systems that will understand column names and types won't necessarily understand sortedness (like json). But it's no harder for the loader to figure out sortedness for the schema than it is for the statistics.
          Hide
          Pradeep Kamath added a comment -

          LoadFunc is now an abstract class with default implementations for some of the methods - we hope this will aid implementers. I would like to make the same change for StoreFunc. Since PigStorage currently does both load and store, we would need to also introduce an interface - StoreFuncInterface so that PigStorage can extend LoadFunc and implement StoreFuncInterface. To be symmetrical, we would need to also introduce a LoadFuncInterface. This interface can be used by implementers if they want their loadFunc implementation to extend some other class. We can document and recommend strongly to users to only use our abstract classes since that would be make them less vulnerable to incompatibile additions in the future (hopefully when we add new methods into these abstract classes we will give a default implementation).

          I will upload a patch for this unless anyone has strong objections.

          Show
          Pradeep Kamath added a comment - LoadFunc is now an abstract class with default implementations for some of the methods - we hope this will aid implementers. I would like to make the same change for StoreFunc. Since PigStorage currently does both load and store, we would need to also introduce an interface - StoreFuncInterface so that PigStorage can extend LoadFunc and implement StoreFuncInterface. To be symmetrical, we would need to also introduce a LoadFuncInterface. This interface can be used by implementers if they want their loadFunc implementation to extend some other class. We can document and recommend strongly to users to only use our abstract classes since that would be make them less vulnerable to incompatibile additions in the future (hopefully when we add new methods into these abstract classes we will give a default implementation). I will upload a patch for this unless anyone has strong objections.
          Hide
          Pradeep Kamath added a comment -

          In retrospect, I think we can skip on creating a LoadFuncInterface since currently there is no real use case for an interface - we are adding it to keep symmetry with StoreFuncINterface and to allow implementations which extends other classes to implement this interface. The first motivation is not very strong and second also can be achieved through composition rather than inheritance - it is unclear how inheriting a different class would benefit a Loader implementation over composition to delegation functionality. By introducing a LoadFuncInterface we would be exposing users who implement it to backward incompatible additions in the future. So I think we should not add a LoadFuncInterface now and ONLY if a real need arises add it. The rest of my proposal (making StoreFunc an abstract class and add a new StoreFuncInterface) still holds.

          Show
          Pradeep Kamath added a comment - In retrospect, I think we can skip on creating a LoadFuncInterface since currently there is no real use case for an interface - we are adding it to keep symmetry with StoreFuncINterface and to allow implementations which extends other classes to implement this interface. The first motivation is not very strong and second also can be achieved through composition rather than inheritance - it is unclear how inheriting a different class would benefit a Loader implementation over composition to delegation functionality. By introducing a LoadFuncInterface we would be exposing users who implement it to backward incompatible additions in the future. So I think we should not add a LoadFuncInterface now and ONLY if a real need arises add it. The rest of my proposal (making StoreFunc an abstract class and add a new StoreFuncInterface) still holds.
          Hide
          Olga Natkovich added a comment -

          Jeff, are you planning to work on this for 0.8.0 release?

          Show
          Olga Natkovich added a comment - Jeff, are you planning to work on this for 0.8.0 release?

            People

            • Assignee:
              Alan Gates
              Reporter:
              Alan Gates
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development