Pig
  1. Pig
  2. PIG-3288

Kill jobs if the number of output files is over a configurable limit

    Details

    • Type: Wish Wish
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Won't Fix
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      I ran into a situation where a Pig job tried to create too many files on hdfs and overloaded NN. To prevent such events, it would be nice if we could set a upper limit on the number of files that a Pig job can create.

      In fact, Hive has a property called "hive.exec.max.created.files". The idea is that each mapper/reducer increases a counter every time when they create files. Then, MRLauncher periodically checks whether the number of created files so far has exceeded the upper limit. If so, we kill running jobs and exit.

      1. PIG-3288-5.patch
        13 kB
        Cheolsoo Park
      2. PIG-3288-4.patch
        11 kB
        Cheolsoo Park
      3. PIG-3288-3.patch
        11 kB
        Cheolsoo Park
      4. PIG-3288-2.patch
        11 kB
        Cheolsoo Park
      5. PIG-3288.patch
        9 kB
        Cheolsoo Park

        Activity

        Hide
        Cheolsoo Park added a comment -

        Attached is a patch that I implemented the following:

        • I added a new property called pig.exec.hdfs.files.max.limit.
        • When this property is enabled, MRLauncher monitors a counter (CREATED_FILES_COUTNER) periodically.
        • Since how many files are created by a mapper/reducer is RecordWriter-specific, each storage is responsible for increasing the counter properly. As a reference example, I added code that increases the counter in PigLineRecordWriter for PigStorage.
        Show
        Cheolsoo Park added a comment - Attached is a patch that I implemented the following: I added a new property called pig.exec.hdfs.files.max.limit . When this property is enabled, MRLauncher monitors a counter ( CREATED_FILES_COUTNER ) periodically. Since how many files are created by a mapper/reducer is RecordWriter-specific, each storage is responsible for increasing the counter properly. As a reference example, I added code that increases the counter in PigLineRecordWriter for PigStorage.
        Hide
        Cheolsoo Park added a comment -

        Added an e2e test case and updated the docs.

        Regarding the e2e test case, what it does is as follows:

        • Set pig.exec.created.files.max.limit 1.
        • Make Pig create output files.
        • Verify that Pig job is killed.

        Ready for review. Thanks!

        Show
        Cheolsoo Park added a comment - Added an e2e test case and updated the docs. Regarding the e2e test case, what it does is as follows: Set pig.exec.created.files.max.limit 1. Make Pig create output files. Verify that Pig job is killed. Ready for review. Thanks!
        Hide
        Daniel Dai added a comment -

        How about putting the counter logic in the StoreFunc? Seems your patch only deals with PigTextInputFormat based StoreFunc. If we can push this to StoreFunc, other StoreFunc can implement the same thing.

        Show
        Daniel Dai added a comment - How about putting the counter logic in the StoreFunc? Seems your patch only deals with PigTextInputFormat based StoreFunc. If we can push this to StoreFunc, other StoreFunc can implement the same thing.
        Hide
        Cheolsoo Park added a comment -

        That's actually a good idea. I will move the default implementation to StoreFunc and let other storer override it. Thank you very much for the suggestion!

        Show
        Cheolsoo Park added a comment - That's actually a good idea. I will move the default implementation to StoreFunc and let other storer override it. Thank you very much for the suggestion!
        Hide
        Cheolsoo Park added a comment -

        I updated my patch as follows:

        • I removed the counter logic from PigTextOutputFormat, but I didn't push it into StoreFunc. The reason is because I think this logic is storage-specific, so I wanted to leave implementation to storages. Even though we can provide a default implementation in StoreFunc class, it won't be useful unless other storages subclass it.
        • However, I still needed a storage that increments the counter for test. So I wrote one by wrapping PigStorage with StoreFuncWrapper. I updated my e2e test using this storage.

        Tests done:

        • Ran the new e2e test case TooManyFilesCreatedErrors_1 on cluster.
        • Ran test-commit.

        Thanks!

        Show
        Cheolsoo Park added a comment - I updated my patch as follows: I removed the counter logic from PigTextOutputFormat, but I didn't push it into StoreFunc. The reason is because I think this logic is storage-specific, so I wanted to leave implementation to storages. Even though we can provide a default implementation in StoreFunc class, it won't be useful unless other storages subclass it. However, I still needed a storage that increments the counter for test. So I wrote one by wrapping PigStorage with StoreFuncWrapper. I updated my e2e test using this storage. Tests done: Ran the new e2e test case TooManyFilesCreatedErrors_1 on cluster. Ran test-commit. Thanks!
        Hide
        Cheolsoo Park added a comment -
        Show
        Cheolsoo Park added a comment - ReviewBoard: https://reviews.apache.org/r/11719/
        Hide
        Aniket Mokashi added a comment -

        Implementation is generic that the counter need not count number of files. It can really count arbitrary metrics and kill the job if exceeded. Should we rename the counter from "pig.exec.created.files.max.limit" to something else?
        Also, in the storefunc, you are relying on the fact that for each new file storefunc is reinitialize in a new object. Is it a guaranteed behavior?

        Show
        Aniket Mokashi added a comment - Implementation is generic that the counter need not count number of files. It can really count arbitrary metrics and kill the job if exceeded. Should we rename the counter from "pig.exec.created.files.max.limit" to something else? Also, in the storefunc, you are relying on the fact that for each new file storefunc is reinitialize in a new object. Is it a guaranteed behavior?
        Hide
        Cheolsoo Park added a comment -

        Aniket Mokashi, thank you very much for your feedback!

        1. I like your suggestion regarding the name of the property/counter. I'll probably change it to "pig.exec.termination.counter.limit". Let me know if you have a better suggestion.
        2. The storefunc (PigStorageWithFileCount) that I wrote is just for e2e test, and with this storefunc, it is true that for each new file, a new storefunc is initialized. Again, the implementation of how to increment the counter entirely depends on storage implementation. For example, if you're using CombinedOutputFormat, it's your responsibility to increment the counter properly in your storage. I documented it clearly.
        Show
        Cheolsoo Park added a comment - Aniket Mokashi , thank you very much for your feedback! I like your suggestion regarding the name of the property/counter. I'll probably change it to "pig.exec.termination.counter.limit". Let me know if you have a better suggestion. The storefunc (PigStorageWithFileCount) that I wrote is just for e2e test, and with this storefunc , it is true that for each new file, a new storefunc is initialized. Again, the implementation of how to increment the counter entirely depends on storage implementation. For example, if you're using CombinedOutputFormat, it's your responsibility to increment the counter properly in your storage. I documented it clearly.
        Hide
        Cheolsoo Park added a comment -

        Changed the property/count name in a new patch.

        Show
        Cheolsoo Park added a comment - Changed the property/count name in a new patch.
        Hide
        Aniket Mokashi added a comment -

        Cheolsoo Park, how about taking approach similar to MonitoredUDF? That way, instead of a common property for all sorts of errors, you can configure your own property inside your EvalFunc/LoadFunc with Annotations and pig will kill the job if the UDF misbehaves (with respect to contract of the udf rather than contract of the pig-installation aka pig.properties).
        I have another use case that can utilize this framework (if we build one). I can use this with assertions: I can annotate assert udf and kill the job instantaneously if the assertion fails.

        Show
        Aniket Mokashi added a comment - Cheolsoo Park , how about taking approach similar to MonitoredUDF? That way, instead of a common property for all sorts of errors, you can configure your own property inside your EvalFunc/LoadFunc with Annotations and pig will kill the job if the UDF misbehaves (with respect to contract of the udf rather than contract of the pig-installation aka pig.properties). I have another use case that can utilize this framework (if we build one). I can use this with assertions: I can annotate assert udf and kill the job instantaneously if the assertion fails.
        Hide
        Cheolsoo Park added a comment -

        Aniket Mokashi, your suggestion makes a lot of sense, and I like it. Let me think about this more. Canceling the patch for now.

        Show
        Cheolsoo Park added a comment - Aniket Mokashi , your suggestion makes a lot of sense, and I like it. Let me think about this more. Canceling the patch for now.
        Hide
        Aniket Mokashi added a comment -

        Cheolsoo Park, I am attempting to solve this with UnrecoverableException kind of approach now. (annotations only allow compiled time configuration ). With that, udfs can just throw PigBadDataException and you can configure the behavior through configuration parameters to deal with it using counters.
        Do you have work in progress on this?

        Show
        Aniket Mokashi added a comment - Cheolsoo Park , I am attempting to solve this with UnrecoverableException kind of approach now. (annotations only allow compiled time configuration ). With that, udfs can just throw PigBadDataException and you can configure the behavior through configuration parameters to deal with it using counters. Do you have work in progress on this?
        Hide
        Cheolsoo Park added a comment -

        Aniket Mokashi, no, I am not working on this. Please feel free to take it over.

        Show
        Cheolsoo Park added a comment - Aniket Mokashi , no, I am not working on this. Please feel free to take it over.

          People

          • Assignee:
            Cheolsoo Park
            Reporter:
            Cheolsoo Park
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development