Pig
  1. Pig
  2. PIG-2672

Optimize the use of DistributedCache

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.13.0
    • Component/s: None
    • Labels:
      None

      Description

      Pig currently copies jar files to a temporary location in hdfs and then adds them to DistributedCache for each job launched. This is inefficient in terms of

      • Space - The jars are distributed to task trackers for every job taking up lot of local temporary space in tasktrackers.
      • Performance - The jar distribution impacts the job launch time.
      1. PIG-2672-10.patch
        12 kB
        Aniket Mokashi
      2. PIG-2672-7.patch
        11 kB
        Aniket Mokashi
      3. PIG-2672-5.patch
        43 kB
        Aniket Mokashi
      4. PIG-2672.patch
        23 kB
        Aniket Mokashi

        Issue Links

          Activity

          Rohini Palaniswamy created issue -
          Hide
          Rohini Palaniswamy added a comment -

          Proposed Solution:

          • For each user create a .pig directory. For eg: /user/rohini/.pig. Copy the pig libraries to /user/rohini/.pig/piglib/pig-[version]/ and then add them to distributed cache. If the jars are already present in hdfs, just add them to distributed cache.
          • Copy the user libraries to /user/rohini/.pig/userlib/jarname-[checksum|filesize].jar and then add them to distributed cache.If the jar with same checksum is already present in hdfs, just add it to distributed cache.
          • This will allow shipping of jars/udfs only once to the cluster and prevent multiple copies in different locations in tasktracker.
          • Reasoning for copying the jar with the checksum or filesize included in the name is to avoid job failures due to overwriting of jars. For eg: if there is user jar, that is copied as part of one pig job. If the user runs another pig job with a modified version of the same jar while the old job is running, there will be a conflict. The cleanup job checks if the files in distributed cache have same timestamp as the original hdfs file and fail the job if that is not the case. So even if the old job's map/reduce task completed successfully it will fail in cleanup.
          • This solution can be a configuration. If turned off, it can revert to the old behaviour.

          We have used this approach for our dataloading application which runs close to >50K jobs everyday that shipped around 5 jars and this improved job launch performance quite a bit. With more number of jars in pig it should show more improvement in the performance. Currently pig takes a relatively long time to launch a job.

          Show
          Rohini Palaniswamy added a comment - Proposed Solution: For each user create a .pig directory. For eg: /user/rohini/.pig. Copy the pig libraries to /user/rohini/.pig/piglib/pig- [version] / and then add them to distributed cache. If the jars are already present in hdfs, just add them to distributed cache. Copy the user libraries to /user/rohini/.pig/userlib/jarname- [checksum|filesize] .jar and then add them to distributed cache.If the jar with same checksum is already present in hdfs, just add it to distributed cache. This will allow shipping of jars/udfs only once to the cluster and prevent multiple copies in different locations in tasktracker. Reasoning for copying the jar with the checksum or filesize included in the name is to avoid job failures due to overwriting of jars. For eg: if there is user jar, that is copied as part of one pig job. If the user runs another pig job with a modified version of the same jar while the old job is running, there will be a conflict. The cleanup job checks if the files in distributed cache have same timestamp as the original hdfs file and fail the job if that is not the case. So even if the old job's map/reduce task completed successfully it will fail in cleanup. This solution can be a configuration. If turned off, it can revert to the old behaviour. We have used this approach for our dataloading application which runs close to >50K jobs everyday that shipped around 5 jars and this improved job launch performance quite a bit. With more number of jars in pig it should show more improvement in the performance. Currently pig takes a relatively long time to launch a job.
          Hide
          Daniel Dai added a comment -

          Yes, this is inline with our observation in HCATALOG-385. Ship a new jar to hdfs will create a new entry in distributed cache, reuse them will not.

          More over, we see issues when there are too many jars in distributed cache (hadoop also unjar them), we run out of the inode.

          +1 for this change.

          Show
          Daniel Dai added a comment - Yes, this is inline with our observation in HCATALOG-385 . Ship a new jar to hdfs will create a new entry in distributed cache, reuse them will not. More over, we see issues when there are too many jars in distributed cache (hadoop also unjar them), we run out of the inode. +1 for this change.
          Hide
          Rohini Palaniswamy added a comment -

          Yes. Pig also currently does DistributedCache.addFileToClassPath(or tmpfiles in jobconf). It should be DistributedCache.addArchiveToClassPath (or tmpjars in jobconf) instead.

          I have not seen unjar happening when you do DistributedCache.addArchiveToClassPath.

          Show
          Rohini Palaniswamy added a comment - Yes. Pig also currently does DistributedCache.addFileToClassPath(or tmpfiles in jobconf). It should be DistributedCache.addArchiveToClassPath (or tmpjars in jobconf) instead. I have not seen unjar happening when you do DistributedCache.addArchiveToClassPath.
          Hide
          Daniel Dai added a comment -

          Which version of hadoop are you using? I also notice in some version of hadoop (eg, 0.23), it does not unjar, which sprawl some other issues such as PIG-2486.

          Show
          Daniel Dai added a comment - Which version of hadoop are you using? I also notice in some version of hadoop (eg, 0.23), it does not unjar, which sprawl some other issues such as PIG-2486 .
          Hide
          Dmitriy V. Ryaboy added a comment -

          This would be a great addition.
          Couple of proposed refinements to the design:

          1) same behavior should happen on the local client, for cases when users register jars from HDFS (no need to copy if a jar with same name+cksum is cached locally)
          2) the directory should be .pig/jarcache/ or similar
          3) we should be very explicit about documenting this behavior, and provide management tools for this cache, so people don't get surprised as this cache grows progressively bigger in size
          4) it could be helpful to have a configurable cluster-level cache, instead or in addition to user-level cache, for cases when many users are using the same jar. There may be security concerns with that.

          Show
          Dmitriy V. Ryaboy added a comment - This would be a great addition. Couple of proposed refinements to the design: 1) same behavior should happen on the local client, for cases when users register jars from HDFS (no need to copy if a jar with same name+cksum is cached locally) 2) the directory should be .pig/jarcache/ or similar 3) we should be very explicit about documenting this behavior, and provide management tools for this cache, so people don't get surprised as this cache grows progressively bigger in size 4) it could be helpful to have a configurable cluster-level cache, instead or in addition to user-level cache, for cases when many users are using the same jar. There may be security concerns with that.
          Hide
          Daniel Dai added a comment -

          Sounds reasonable. Yes, for security reason, we can start with user level cache.

          Show
          Daniel Dai added a comment - Sounds reasonable. Yes, for security reason, we can start with user level cache.
          Hide
          Daniel Dai added a comment -

          Also it sounds like a more general issue to Hadoop. I feel it is better Hadoop could solve this problem, which would benefit HCat, Hive, etc. We shall open a ticket in Hadoop.

          Show
          Daniel Dai added a comment - Also it sounds like a more general issue to Hadoop. I feel it is better Hadoop could solve this problem, which would benefit HCat, Hive, etc. We shall open a ticket in Hadoop.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Can we do both? I can roll pig versions much faster than I can roll Hadoop versions (no restart required, fewer moving parts...)

          Show
          Dmitriy V. Ryaboy added a comment - Can we do both? I can roll pig versions much faster than I can roll Hadoop versions (no restart required, fewer moving parts...)
          Hide
          Rohini Palaniswamy added a comment -

          Have used all versions of hadoop from 0.20, 0.20S to 23. Have never seen it unjarred till now. Verified by checking the cache directory of production task trackers of both 0.20.205 and 0.23. They are not unjarred and we are certainly using "tmpjars".

          But looking at the code in TrackerDistributedCacheManager, I am wondering why it did not unjar. The code definitely seems to be unjarring. Confused and need to dig deeper.

          Show
          Rohini Palaniswamy added a comment - Have used all versions of hadoop from 0.20, 0.20S to 23. Have never seen it unjarred till now. Verified by checking the cache directory of production task trackers of both 0.20.205 and 0.23. They are not unjarred and we are certainly using "tmpjars". But looking at the code in TrackerDistributedCacheManager, I am wondering why it did not unjar. The code definitely seems to be unjarring. Confused and need to dig deeper.
          Hide
          Daniel Dai added a comment -

          Yes, even Hadoop agree to fix, it's a long time away. We can fix in Pig, and I will definitely port it to HCat, and maybe Hive.

          Show
          Daniel Dai added a comment - Yes, even Hadoop agree to fix, it's a long time away. We can fix in Pig, and I will definitely port it to HCat, and maybe Hive.
          Rohini Palaniswamy made changes -
          Field Original Value New Value
          Assignee Rohini Palaniswamy [ rohini ]
          Hide
          Aniket Mokashi added a comment -

          For unjar behavior, I think, hadoop does not unjar if it has https://issues.apache.org/jira/browse/MAPREDUCE-967.

          Show
          Aniket Mokashi added a comment - For unjar behavior, I think, hadoop does not unjar if it has https://issues.apache.org/jira/browse/MAPREDUCE-967 .
          Aniket Mokashi made changes -
          Link This issue relates to HADOOP-9639 [ HADOOP-9639 ]
          Aniket Mokashi made changes -
          Fix Version/s 0.12 [ 12323380 ]
          Aniket Mokashi made changes -
          Attachment PIG-2672.patch [ 12604324 ]
          Aniket Mokashi made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Aniket Mokashi added a comment -

          I have attached a patch that that adds 2 configuration parameters- cluster.cache.location and user.cache.location.

          Jars are copied to <cache.location>/a/b/c/checksum-jarname.jar where a, b, c are first 3 characters of the checksum. When a new jar is registered, checksum is calculated and we check whether a jar with same name/checksum exists in the cache. If yes, copy to hdfs is avoided.

          Permissions to write to cache is managed by HDFS permissions. Also, its not possible to overwrite a jar using this mechanism. If jar changes, its checksum will also change and it will be a new jar in the cache. Removal of old jars is manual step- admins/users can list jars under the cache location and remove the ones that are very old. Alternatively, you can delete all the jars in the cache or change jar cache location and cache will be repopulated by running jobs.

          If this approach looks reasonable, I can add few more tests. Comments welcome!

          Show
          Aniket Mokashi added a comment - I have attached a patch that that adds 2 configuration parameters- cluster.cache.location and user.cache.location. Jars are copied to <cache.location>/a/b/c/checksum-jarname.jar where a, b, c are first 3 characters of the checksum. When a new jar is registered, checksum is calculated and we check whether a jar with same name/checksum exists in the cache. If yes, copy to hdfs is avoided. Permissions to write to cache is managed by HDFS permissions. Also, its not possible to overwrite a jar using this mechanism. If jar changes, its checksum will also change and it will be a new jar in the cache. Removal of old jars is manual step- admins/users can list jars under the cache location and remove the ones that are very old. Alternatively, you can delete all the jars in the cache or change jar cache location and cache will be repopulated by running jobs. If this approach looks reasonable, I can add few more tests. Comments welcome!
          Hide
          Aniket Mokashi added a comment -

          Note: HADOOP-9639 has improved mechanism for this. However, this is still somewhat useful for users that are on old versions of hadoop.

          Show
          Aniket Mokashi added a comment - Note: HADOOP-9639 has improved mechanism for this. However, this is still somewhat useful for users that are on old versions of hadoop.
          Aniket Mokashi made changes -
          Assignee Rohini Palaniswamy [ rohini ] Aniket Mokashi [ aniket486 ]
          Hide
          Rohini Palaniswamy added a comment -

          I can take a look at this one. Can you put this up in review board please?

          Show
          Rohini Palaniswamy added a comment - I can take a look at this one. Can you put this up in review board please?
          Hide
          Dmitriy V. Ryaboy added a comment -

          Aniket, can we prefix the properties with "pig."? That way we won't conflict with potential properties from Hadoop, and it's a little easier to analyze stuff when looking at the jobconf.

          Show
          Dmitriy V. Ryaboy added a comment - Aniket, can we prefix the properties with "pig."? That way we won't conflict with potential properties from Hadoop, and it's a little easier to analyze stuff when looking at the jobconf.
          Hide
          Aniket Mokashi added a comment -
          Show
          Aniket Mokashi added a comment - RB: https://reviews.apache.org/r/14274/
          Hide
          Aniket Mokashi added a comment -

          Thanks Dmitriy! I will make those changes.

          Show
          Aniket Mokashi added a comment - Thanks Dmitriy! I will make those changes.
          Hide
          Aniket Mokashi added a comment -

          Oh, actually I just noticed, the config names are - pig.shared.cluster.cache.location, pig.shared.user.cache.location.

          Show
          Aniket Mokashi added a comment - Oh, actually I just noticed, the config names are - pig.shared.cluster.cache.location, pig.shared.user.cache.location.
          Hide
          Koji Noguchi added a comment -

          Note: HADOOP-9639 has improved mechanism for this.

          I haven't read the patch but I thought HADOOP-9639 introduces a security hole unless NodeManager does the SHA-1 level checksumming.

          Show
          Koji Noguchi added a comment - Note: HADOOP-9639 has improved mechanism for this. I haven't read the patch but I thought HADOOP-9639 introduces a security hole unless NodeManager does the SHA-1 level checksumming.
          Hide
          Rohini Palaniswamy added a comment -

          Not able to access the review board. Is it just me or review board is down?

          Show
          Rohini Palaniswamy added a comment - Not able to access the review board. Is it just me or review board is down?
          Hide
          Cheolsoo Park added a comment -

          It seems down to me as well.

          Show
          Cheolsoo Park added a comment - It seems down to me as well.
          Hide
          Cheolsoo Park added a comment -

          Aniket Mokashi, I made some minor comments in the RB. Mostly coding style-related. I haven't tested the patch on real cluster, but can I assume you did?
          Rohini Palaniswamy, please take another look. You're more familiar with hdfs than I am.

          Show
          Cheolsoo Park added a comment - Aniket Mokashi , I made some minor comments in the RB. Mostly coding style-related. I haven't tested the patch on real cluster, but can I assume you did? Rohini Palaniswamy , please take another look. You're more familiar with hdfs than I am.
          Hide
          Cheolsoo Park added a comment -

          Forgot to mention. Aniket Mokashi, can you please document this? Perhaps in the performance and efficiency page?

          Show
          Cheolsoo Park added a comment - Forgot to mention. Aniket Mokashi , can you please document this? Perhaps in the performance and efficiency page ?
          Hide
          Koji Noguchi added a comment -

          On the secure hadoop environment, this patch would basically create a hole and allow any users having write access to PIG_SHARED_CLUSTER_CACHE_LOCATION to become other users (who are sharing this cache location).

          For now, can we instead limit the patch to /user/<username>/.pig or .staging add have extra check on permission 700 ?

          I understand that you can make PIG_SHARED_CLUSTER_CACHE_LOCATION only writable by admin but I'm afraid this patch would make it too easy to misconfigure.

          Show
          Koji Noguchi added a comment - On the secure hadoop environment, this patch would basically create a hole and allow any users having write access to PIG_SHARED_CLUSTER_CACHE_LOCATION to become other users (who are sharing this cache location). For now, can we instead limit the patch to /user/<username>/.pig or .staging add have extra check on permission 700 ? I understand that you can make PIG_SHARED_CLUSTER_CACHE_LOCATION only writable by admin but I'm afraid this patch would make it too easy to misconfigure.
          Hide
          Aniket Mokashi added a comment -

          Cheolsoo Park, thanks for your comments. I will work on the patch to make it more production ready. I have tried it on a simple job, but not in production yet.

          Koji Noguchi, I do not understand your concern here. Currently jars get copied to /tmp/temp-<random>/ which can be written by all users. I do not see how jar cache is less secure than the current approach. In fact, any misconfiguration is still protected by SHA (hard to collide).

          I do not see any benefit of restricting to use /user/<username>/.pig as its not mandatory to have that directory secure for users (Am I right?). If you look closely, cluster cache and user cache have exactly similar behavior. The only reason we have two is for easy configuration and better dedup of jars across the cluster.

          Show
          Aniket Mokashi added a comment - Cheolsoo Park , thanks for your comments. I will work on the patch to make it more production ready. I have tried it on a simple job, but not in production yet. Koji Noguchi , I do not understand your concern here. Currently jars get copied to /tmp/temp-<random>/ which can be written by all users. I do not see how jar cache is less secure than the current approach. In fact, any misconfiguration is still protected by SHA (hard to collide). I do not see any benefit of restricting to use /user/<username>/.pig as its not mandatory to have that directory secure for users (Am I right?). If you look closely, cluster cache and user cache have exactly similar behavior. The only reason we have two is for easy configuration and better dedup of jars across the cluster.
          Hide
          Rohini Palaniswamy added a comment -

          Aniket Mokashi,

          Currently jars get copied to /tmp/temp-<random>/ which can be written by all users

          No. They do not. They go into /user/<username>/.staging which is readable and writable only by that user. Even if it were to go to /tmp/temp- (where the intermediate files now go), we have the dfs.umaskmode set to 077 so only the user has rwx and no one else has.

          It is good to have shared cluster location, but if someone accidentally deletes that directory then all user jobs already launched will fail. It would be good if you can add the check to see if the cachedir is writable before trying to create it there. People with a multi-tenant environment like us can then chose to place frequently used jars in the shared cluster location but protect it with 755 so that others don't write into it.

          Show
          Rohini Palaniswamy added a comment - Aniket Mokashi , Currently jars get copied to /tmp/temp-<random>/ which can be written by all users No. They do not. They go into /user/<username>/.staging which is readable and writable only by that user. Even if it were to go to /tmp/temp- (where the intermediate files now go), we have the dfs.umaskmode set to 077 so only the user has rwx and no one else has. It is good to have shared cluster location, but if someone accidentally deletes that directory then all user jobs already launched will fail. It would be good if you can add the check to see if the cachedir is writable before trying to create it there. People with a multi-tenant environment like us can then chose to place frequently used jars in the shared cluster location but protect it with 755 so that others don't write into it.
          Hide
          Rohini Palaniswamy added a comment -

          I guess you don't have to check for permissions as you are anyways returning null on a IOException.

          Show
          Rohini Palaniswamy added a comment - I guess you don't have to check for permissions as you are anyways returning null on a IOException.
          Hide
          Koji Noguchi added a comment -

          In fact, any misconfiguration is still protected by SHA (hard to collide).

          SHA is meaningless here unless verified by the trusted entity. (NodeManager or TaskTracker in HADOOP-9639).
          Say abc.jar was installed locally. UserEvil can figure out what the shared hdfs path is since he has access to the local file.
          Then UserEvil can upload any kind of jar with that filename as long as he is the first user to upload.

          Now, any users trying to use this local abc.jar would be unknowingly executing the random jar uploaded by this UserEvil.

          Show
          Koji Noguchi added a comment - In fact, any misconfiguration is still protected by SHA (hard to collide). SHA is meaningless here unless verified by the trusted entity. (NodeManager or TaskTracker in HADOOP-9639 ). Say abc.jar was installed locally. UserEvil can figure out what the shared hdfs path is since he has access to the local file. Then UserEvil can upload any kind of jar with that filename as long as he is the first user to upload. Now, any users trying to use this local abc.jar would be unknowingly executing the random jar uploaded by this UserEvil.
          Hide
          Aniket Mokashi added a comment -

          Rohini Palaniswamy, from the current code, we have-

           
          Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix); 
          

          Hence, files are (by default) copied to /tmp/temp-<random>/. I do not see a way to configure it to a relative path, but I might be wrong.

          UserEvil can figure out what the shared hdfs path is since he has access to the local file.

          This is true even today where UserEvil can look into jobconf to find the location of jars and replace whatever jars if wanted. Even if they are protected like Rohini explained earlier, still the protection is coming from HDFS and not pig.

          I'm deliberately avoiding in permission checks in this code path. In terms of security, I feel that this is no worse than what we have right now.

          Next steps-
          1. Address code review comments from RB and submit a fresh patch.
          2. Run this for several jobs in practice and ensure there are no bad/side effects.
          3. Cheolsoo Park, can you please help me with e2e for this?
          4. Open a documentation jira and explain how this works in pig docs.

          Anything else I missed?

          Show
          Aniket Mokashi added a comment - Rohini Palaniswamy , from the current code, we have- Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix); Hence, files are (by default) copied to /tmp/temp-<random>/. I do not see a way to configure it to a relative path, but I might be wrong. UserEvil can figure out what the shared hdfs path is since he has access to the local file. This is true even today where UserEvil can look into jobconf to find the location of jars and replace whatever jars if wanted. Even if they are protected like Rohini explained earlier, still the protection is coming from HDFS and not pig. I'm deliberately avoiding in permission checks in this code path. In terms of security, I feel that this is no worse than what we have right now. Next steps- 1. Address code review comments from RB and submit a fresh patch. 2. Run this for several jobs in practice and ensure there are no bad/side effects. 3. Cheolsoo Park , can you please help me with e2e for this? 4. Open a documentation jira and explain how this works in pig docs. Anything else I missed?
          Hide
          Cheolsoo Park added a comment -

          Aniket Mokashi, are you asking me to run the e2e tests with your new patch? Yes.

          Show
          Cheolsoo Park added a comment - Aniket Mokashi , are you asking me to run the e2e tests with your new patch? Yes.
          Hide
          Rohini Palaniswamy added a comment -

          Aniket Mokashi
          It is a issue that we need to fix. We need to set 700 on FileLocalizer.relativeRoot when we create it.

          If you look at http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?revision=1206848&view=markup getStagingDir() method they check and throw error if staging dir where jars are created is not 700 and owned by that user. We need to add that check also for the user cache location else it is a security hole.

          Show
          Rohini Palaniswamy added a comment - Aniket Mokashi It is a issue that we need to fix. We need to set 700 on FileLocalizer.relativeRoot when we create it. If you look at http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?revision=1206848&view=markup getStagingDir() method they check and throw error if staging dir where jars are created is not 700 and owned by that user. We need to add that check also for the user cache location else it is a security hole.
          Hide
          Jason Lowe added a comment -

          I'm deliberately avoiding in permission checks in this code path. In terms of security, I feel that this is no worse than what we have right now.

          A shared cache where anyone can write is indeed worse. Today jars are being uploaded to HDFS into a private staging directory where no other normal user can interfere. If the staging directory were to become publicly writeable then it becomes trivial to compromise all users trying to run the same pig jar using a scheme like Koji Noguchi pointed out. I don't see how one can accomplish the same level of havoc today. Even if there's a window in the local filesystem where one can hijack a jar, that requires access to the same node where the user is launching the job. In the publicly-writeable shared cache scheme, one only needs access to HDFS from any node and clients on all nodes using the shared cache can be compromised.

          Besides malicious users, the shared cache can also be accidentally made ineffective by clients. For example, a user with a restrictive umask (e.g.: 077) uploads a jar to the shared cache, and all the directories and files were created such that others can't read them. Now because the permissions are incorrect any other user can't share the file and any other user's file that happens to have the same initial digit(s) in its hash can't be uploaded to the shared cache. And then there's the client that deletes files in-use by other clients, breaking their jobs.

          In short, shared public caches that are publicly writeable are going to be problematic, especially in secure setups. As such I think there should at least be some documentation describing the risks of enabling it and how it could be used in a read-only manner for sharing securely, i.e.: shared cache is publicly readable but only writeable by admins who manually maintain the entries in the shared cache.

          Show
          Jason Lowe added a comment - I'm deliberately avoiding in permission checks in this code path. In terms of security, I feel that this is no worse than what we have right now. A shared cache where anyone can write is indeed worse. Today jars are being uploaded to HDFS into a private staging directory where no other normal user can interfere. If the staging directory were to become publicly writeable then it becomes trivial to compromise all users trying to run the same pig jar using a scheme like Koji Noguchi pointed out. I don't see how one can accomplish the same level of havoc today. Even if there's a window in the local filesystem where one can hijack a jar, that requires access to the same node where the user is launching the job. In the publicly-writeable shared cache scheme, one only needs access to HDFS from any node and clients on all nodes using the shared cache can be compromised. Besides malicious users, the shared cache can also be accidentally made ineffective by clients. For example, a user with a restrictive umask (e.g.: 077) uploads a jar to the shared cache, and all the directories and files were created such that others can't read them. Now because the permissions are incorrect any other user can't share the file and any other user's file that happens to have the same initial digit(s) in its hash can't be uploaded to the shared cache. And then there's the client that deletes files in-use by other clients, breaking their jobs. In short, shared public caches that are publicly writeable are going to be problematic, especially in secure setups. As such I think there should at least be some documentation describing the risks of enabling it and how it could be used in a read-only manner for sharing securely, i.e.: shared cache is publicly readable but only writeable by admins who manually maintain the entries in the shared cache.
          Hide
          Aniket Mokashi added a comment -

          Today jars are being uploaded to HDFS into a private staging directory where no other normal user can interfere

          Where in pig do we mark this private? can you point me to the line number? If its outside of pig, we can do the same even now.

          Show
          Aniket Mokashi added a comment - Today jars are being uploaded to HDFS into a private staging directory where no other normal user can interfere Where in pig do we mark this private? can you point me to the line number? If its outside of pig, we can do the same even now.
          Aniket Mokashi made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Aniket Mokashi made changes -
          Assignee Aniket Mokashi [ aniket486 ]
          Hide
          Aniket Mokashi added a comment -

          We need to set 700 on FileLocalizer.relativeRoot when we create it.

          If its 700, others cannot read the cache. User can only share jar with himself.

          Show
          Aniket Mokashi added a comment - We need to set 700 on FileLocalizer.relativeRoot when we create it. If its 700, others cannot read the cache. User can only share jar with himself.
          Hide
          Rohini Palaniswamy added a comment -

          can you point me to the line number? If its outside of pig, we can do the same even now.

          job.jar, job.xml, etc are copied to /user/<username>/.staging by JobClient. Refer to my previous comment for the source code reference in hadoop.

          Show
          Rohini Palaniswamy added a comment - can you point me to the line number? If its outside of pig, we can do the same even now. job.jar, job.xml, etc are copied to /user/<username>/.staging by JobClient. Refer to my previous comment for the source code reference in hadoop.
          Hide
          Rohini Palaniswamy added a comment -

          FileLocalizer.relativeRoot is not the cache location. It is the directory created for each pig script run to store temporary data. We need to set 700 on it.

          Show
          Rohini Palaniswamy added a comment - FileLocalizer.relativeRoot is not the cache location. It is the directory created for each pig script run to store temporary data. We need to set 700 on it.
          Hide
          Aniket Mokashi added a comment -

          Currently, jars are copied to Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);.

          Show
          Aniket Mokashi added a comment - Currently, jars are copied to Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);.
          Hide
          Aniket Mokashi added a comment -

          I think we are confusing job.jar with shipped jars.

          Show
          Aniket Mokashi added a comment - I think we are confusing job.jar with shipped jars.
          Hide
          Rohini Palaniswamy added a comment -

          To clarify:
          1) In 0.10 all jars (including pig and registered jars) were packaged into job.jar which was copied into /user/<username>/.staging by JobClient. In 0.11, registered extra jars are copied to FileLocalizer.getTemporaryPath(pigContext) which is a directory under FileLocalizer.relativeRoot. But still job.jar is copied into /user/<username>/.staging by JobClient. To address the FileLocalizer.getTemporaryPath security we need to set 700 on FileLocalizer.relativeRoot. This is an existing security problem in 0.11. With your patch you copy to a shared or user cache location and if both are not configured you still fall back to FileLocalizer.getTemporaryPath, so it needs to be addressed.
          2) Second thing is writing to a user cache location which is introduced in this patch. Before writing to it we need to check if it is 700 and owned by that user similar to the check done by JobClient for /user/<username>/.staging.

          Show
          Rohini Palaniswamy added a comment - To clarify: 1) In 0.10 all jars (including pig and registered jars) were packaged into job.jar which was copied into /user/<username>/.staging by JobClient. In 0.11, registered extra jars are copied to FileLocalizer.getTemporaryPath(pigContext) which is a directory under FileLocalizer.relativeRoot. But still job.jar is copied into /user/<username>/.staging by JobClient. To address the FileLocalizer.getTemporaryPath security we need to set 700 on FileLocalizer.relativeRoot. This is an existing security problem in 0.11. With your patch you copy to a shared or user cache location and if both are not configured you still fall back to FileLocalizer.getTemporaryPath, so it needs to be addressed. 2) Second thing is writing to a user cache location which is introduced in this patch. Before writing to it we need to check if it is 700 and owned by that user similar to the check done by JobClient for /user/<username>/.staging.
          Hide
          Aniket Mokashi added a comment -

          Thanks everyone for the comments!

          We might need little more refactoring before we change permissions on temporary path. Currently FileLocalizer.getTemporaryPath(pigContext); is getting used for several things. (+for intermediate data, I think). Let me open a jira to track that.

          I will try with your approach (shared user cache) and submit a new patch soon.

          Show
          Aniket Mokashi added a comment - Thanks everyone for the comments! We might need little more refactoring before we change permissions on temporary path. Currently FileLocalizer.getTemporaryPath(pigContext); is getting used for several things. (+for intermediate data, I think). Let me open a jira to track that. I will try with your approach (shared user cache) and submit a new patch soon.
          Hide
          Rohini Palaniswamy added a comment -

          We might need little more refactoring before we change permissions on temporary path. Currently FileLocalizer.getTemporaryPath(pigContext); is getting used for several things. (+for intermediate data, I think)

          Yes. All intermediate data go into that. Setting 700 should not cause problems but ElementDescriptor does not have methods for chmod and that would require refactoring and that is what you probably meant. So a separate jira sounds good.

          Show
          Rohini Palaniswamy added a comment - We might need little more refactoring before we change permissions on temporary path. Currently FileLocalizer.getTemporaryPath(pigContext); is getting used for several things. (+for intermediate data, I think) Yes. All intermediate data go into that. Setting 700 should not cause problems but ElementDescriptor does not have methods for chmod and that would require refactoring and that is what you probably meant. So a separate jira sounds good.
          Daniel Dai made changes -
          Fix Version/s 0.13.0 [ 12324971 ]
          Fix Version/s 0.12.0 [ 12323380 ]
          Show
          Aniket Mokashi added a comment - Opened: https://issues.apache.org/jira/browse/PIG-3511 .
          Hide
          Aniket Mokashi added a comment -

          Another attempt-
          Using stagingDir = JobSubmissionFiles.getStagingDir(jobClient, conf); to copy the shared files. Obviously, this is not the perfect solution to this problem and YARN-1492 will present a better fix for this.

          Show
          Aniket Mokashi added a comment - Another attempt- Using stagingDir = JobSubmissionFiles.getStagingDir(jobClient, conf); to copy the shared files. Obviously, this is not the perfect solution to this problem and YARN-1492 will present a better fix for this.
          Aniket Mokashi made changes -
          Attachment PIG-2672-5.patch [ 12624061 ]
          Hide
          Aniket Mokashi added a comment -

          I realized this fix won't work with Hadoop 2 (at least easily). Let me try to do some shims to fix it.

          Meantime, please comment on the approach.

          Show
          Aniket Mokashi added a comment - I realized this fix won't work with Hadoop 2 (at least easily). Let me try to do some shims to fix it. Meantime, please comment on the approach.
          Hide
          Aniket Mokashi added a comment - - edited

          Another proposal: We create /tmp/$user.name/jarcache with 700 and use it as a user level jarcache. Also, every time a jar is used from jar case we do fs.setTimes(jarpath, now, now) to update atime, mtime of the jar (to avoid cleanups).
          Rohini Palaniswamy thoughts?

          Show
          Aniket Mokashi added a comment - - edited Another proposal: We create /tmp/$user.name/jarcache with 700 and use it as a user level jarcache. Also, every time a jar is used from jar case we do fs.setTimes(jarpath, now, now) to update atime, mtime of the jar (to avoid cleanups). Rohini Palaniswamy thoughts?
          Hide
          Rohini Palaniswamy added a comment -

          > JobSubmissionFiles.getStagingDir(jobClient, conf); or We create /tmp/$user.name/jarcache
          I think we should create /user/$user.name/.pig/filecache (Not calling jarcache as we can have files used in streaming as well) and set the permissions of filecache to 700. That way it is more cleaner (as long-term user data is in user directory) and also don't have to rely on hadoop api's to get .staging dir location. Please do not modify the mtime of the jar. If a distributed cache jar mtime is modified when a job completes, hadoop fails the job.

          Show
          Rohini Palaniswamy added a comment - > JobSubmissionFiles.getStagingDir(jobClient, conf); or We create /tmp/$user.name/jarcache I think we should create /user/$user.name/.pig/filecache (Not calling jarcache as we can have files used in streaming as well) and set the permissions of filecache to 700. That way it is more cleaner (as long-term user data is in user directory) and also don't have to rely on hadoop api's to get .staging dir location. Please do not modify the mtime of the jar. If a distributed cache jar mtime is modified when a job completes, hadoop fails the job.
          Hide
          Aniket Mokashi added a comment -

          Hadoop uses following code to get stagingDir

          {format}
          Path stagingRootDir =
          new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
          "/tmp/hadoop/mapred/staging"));{format}

          which indicates that /user/$user.name may not be available (configured). I can use staging dir as stagingRootDir + user + "/.pig". Would that work?

          Show
          Aniket Mokashi added a comment - Hadoop uses following code to get stagingDir {format} Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging"));{format} which indicates that /user/$user.name may not be available (configured). I can use staging dir as stagingRootDir + user + "/.pig". Would that work?
          Hide
          Koji Noguchi added a comment -

          Thanks Aniket! I like the non-share approach.

          (to avoid cleanups).

          Had a discussion with Jason Lowe about this before. In our cluster, longest job a user can run on our cluster is 1 week. (due to delegation token limit we set). With this assumption, we can create a cache under .Trash as below.

          <noformat>
          $ echo /user/$USER/.Trash/$(date -d 'next monday + 1week' +'%y%m%d'000000)
          /user/knoguchi/.Trash/140203000000 (this is in 0.23/2.* format. 0.20 uses slightly different format)
          <noformat>

          This way, files are reused for 1 week and then thrown away later automatically by a Trash cleanup.
          We threw away the idea for various reasons, but just wanted to share here.

          Show
          Koji Noguchi added a comment - Thanks Aniket! I like the non-share approach. (to avoid cleanups). Had a discussion with Jason Lowe about this before. In our cluster, longest job a user can run on our cluster is 1 week. (due to delegation token limit we set). With this assumption, we can create a cache under .Trash as below. <noformat> $ echo /user/$USER/.Trash/$(date -d 'next monday + 1week' +'%y%m%d'000000) /user/knoguchi/.Trash/140203000000 (this is in 0.23/2.* format. 0.20 uses slightly different format) <noformat> This way, files are reused for 1 week and then thrown away later automatically by a Trash cleanup. We threw away the idea for various reasons, but just wanted to share here.
          Hide
          Rohini Palaniswamy added a comment -

          stagingRootDir + user + "/.pig" is good. But if stagingRootDir starts with fs.getHomeDirectory(), can you make it stagingRootDir + "/.pig". This will avoid creating /user/<username>/<username>/.pig.

          This way, files are reused for 1 week and then thrown away later automatically by a Trash cleanup.

          The cache will be reused by other pig jobs during and after the week and we will not be modifying the time of the files. So we can't put that under .Trash as it will be cleaned up.

          Show
          Rohini Palaniswamy added a comment - stagingRootDir + user + "/.pig" is good. But if stagingRootDir starts with fs.getHomeDirectory(), can you make it stagingRootDir + "/.pig". This will avoid creating /user/<username>/<username>/.pig. This way, files are reused for 1 week and then thrown away later automatically by a Trash cleanup. The cache will be reused by other pig jobs during and after the week and we will not be modifying the time of the files. So we can't put that under .Trash as it will be cleaned up.
          Hide
          Koji Noguchi added a comment -

          The cache will be reused by other pig jobs during and after the week and we will not be modifying the time of the files. So we can't put that under .Trash as it will be cleaned up.

          Maybe I didn't explain it well enough.
          Tasks will always use the cache that is scheduled for deletion next next Monday.
          Given that the longest job (on our cluster) is 1 week, there won't be any jobs using that cache when it's expunged.

          Show
          Koji Noguchi added a comment - The cache will be reused by other pig jobs during and after the week and we will not be modifying the time of the files. So we can't put that under .Trash as it will be cleaned up. Maybe I didn't explain it well enough. Tasks will always use the cache that is scheduled for deletion next next Monday. Given that the longest job (on our cluster) is 1 week, there won't be any jobs using that cache when it's expunged.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Seems like there is a lot of effort being spent here reinventing what is already designed for the general use case in the yarn ticket Aniket linked. Lets not let best be enemy of the good, and just get something in that will be decent for most cases, and if people don't like it, they can turn it off. This is an intermediate solution until that yarn patch goes in, at which point all of this becomes moot.

          Show
          Dmitriy V. Ryaboy added a comment - Seems like there is a lot of effort being spent here reinventing what is already designed for the general use case in the yarn ticket Aniket linked. Lets not let best be enemy of the good, and just get something in that will be decent for most cases, and if people don't like it, they can turn it off. This is an intermediate solution until that yarn patch goes in, at which point all of this becomes moot.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Koji Noguchi in the spirit of keeping things moving – can we commit this? You can feel free to turn the behavior off on your cluster if you are worried about the 1 week boundary. If that's the case, feel free to open another ticket to follow up, or to make sure that YARN-1492 fixes your issue.

          Show
          Dmitriy V. Ryaboy added a comment - Koji Noguchi in the spirit of keeping things moving – can we commit this? You can feel free to turn the behavior off on your cluster if you are worried about the 1 week boundary. If that's the case, feel free to open another ticket to follow up, or to make sure that YARN-1492 fixes your issue.
          Hide
          Rohini Palaniswamy added a comment -

          To me, keeping user home directory instead of .Trash is ok. I can discuss with Koji offline.

          Aniket Mokashi,
          I did not see that there was an updated patch uploaded till now. I was waiting for a new patch from you in review board. If you could update the review board with the recent patch, will take a quick look and do +1. Hard to review plain patch as the patch is big.

          Show
          Rohini Palaniswamy added a comment - To me, keeping user home directory instead of .Trash is ok. I can discuss with Koji offline. Aniket Mokashi , I did not see that there was an updated patch uploaded till now. I was waiting for a new patch from you in review board. If you could update the review board with the recent patch, will take a quick look and do +1. Hard to review plain patch as the patch is big.
          Hide
          Koji Noguchi added a comment -

          Koji Noguchi in the spirit of keeping things moving – can we commit this?

          Sure sure. I'm fine as long as we don't share the jar with multiple users. I didn't mean to block Aniket's latest patch. Sorry about that.

          As for committing, I didn't look at the code in details so I'm assuming you or someone will give +1 on the patch. Thanks again for listening to my concerns.

          Show
          Koji Noguchi added a comment - Koji Noguchi in the spirit of keeping things moving – can we commit this? Sure sure. I'm fine as long as we don't share the jar with multiple users. I didn't mean to block Aniket's latest patch. Sorry about that. As for committing, I didn't look at the code in details so I'm assuming you or someone will give +1 on the patch. Thanks again for listening to my concerns.
          Hide
          Aniket Mokashi added a comment -

          Rohini Palaniswamy, I should do it today or tomorrow.

          Show
          Aniket Mokashi added a comment - Rohini Palaniswamy , I should do it today or tomorrow.
          Aniket Mokashi made changes -
          Attachment PIG-2672-7.patch [ 12626226 ]
          Aniket Mokashi made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Assignee Aniket Mokashi [ aniket486 ]
          Aniket Mokashi made changes -
          Attachment PIG-2672-7.patch [ 12626226 ]
          Aniket Mokashi made changes -
          Attachment PIG-2672-7.patch [ 12626239 ]
          Aniket Mokashi made changes -
          Attachment PIG-2672-9.patch [ 12626611 ]
          Aniket Mokashi made changes -
          Attachment PIG-2672-9.patch [ 12626611 ]
          Aniket Mokashi made changes -
          Attachment PIG-2672-10.patch [ 12626742 ]
          Hide
          Rohini Palaniswamy added a comment -

          +1

          Show
          Rohini Palaniswamy added a comment - +1
          Hide
          Aniket Mokashi added a comment -

          Committed to trunk. Thanks everyone for your inputs and thanks Rohini Palaniswamy for the review.

          Show
          Aniket Mokashi added a comment - Committed to trunk. Thanks everyone for your inputs and thanks Rohini Palaniswamy for the review.
          Aniket Mokashi made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Brock Noland made changes -
          Link This issue is related to HIVE-860 [ HIVE-860 ]
          Hide
          Brock Noland added a comment -

          FYI in in HIVE-860 a reviewer asked me if the following code (copied from this patch) closed the stream:

          String checksum = DigestUtils.shaHex(url.openStream());
          

          Doesn't look like it does according to the common-codec source. Therefore I think pig has a file descriptor leak.

          Show
          Brock Noland added a comment - FYI in in HIVE-860 a reviewer asked me if the following code (copied from this patch) closed the stream: String checksum = DigestUtils.shaHex(url.openStream()); Doesn't look like it does according to the common-codec source. Therefore I think pig has a file descriptor leak.
          Hide
          Aniket Mokashi added a comment -
          Show
          Aniket Mokashi added a comment - Thanks Brock Noland ! Looks like it existed even before this @ https://github.com/apache/pig/blob/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java#L1524 . Let me open another jira to fix it.

            People

            • Assignee:
              Aniket Mokashi
              Reporter:
              Rohini Palaniswamy
            • Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development