Pig
  1. Pig
  2. PIG-3669

Wrong Usage of Scalar can bring down Namenode

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      People often get confused and end up using . after a join instead of :: (PIG-2134) to reference fields of the join. Pig takes it to be a scalar and tries to read the whole join data assuming there is only one record and then fails with "scalar has more than one row in the output". ReadScalars uses InterStorage to read the scalar data. It uses InterInputFormat which extends FileInputFormat and for getSplits(), there is a listStatus on all the files in the dir to construct the splits and then InterRecordReader is used on the splits to read the data. When the data is really huge with lot of files, this can cause the namenode to go out of memory and crash and especially with the recent optimization change in hadoop 0.23/2.x that uses listLocatedStatus instead of listStatus + getBlockLocations in FileInputFormat to reduce number of calls to Namenode.

      In the particular case we encountered, the join output had 6.5K files. On the job that did ReadScalars, it had 8K+ tasks (with speculative execution) and all of them doing listStatus for the 6.5K files caused NN queue to fill up with huge responses (block locations makes the response even bigger). It also saturated the network, causing responses to build up in NN without being sent finally leading it to crash with OOM.

        Issue Links

          Activity

          Rohini Palaniswamy created issue -
          Rohini Palaniswamy made changes -
          Field Original Value New Value
          Link This issue is related to PIG-2629 [ PIG-2629 ]
          Hide
          Rohini Palaniswamy added a comment -

          It would be good to have a different syntax for scalar. But not sure how this can be done without breaking backward compatibility.

          The other solution that I could think of to mitigate the problem was to validate all the ReadScalars UDF in the plan by executing them in the OutputCommitter's setupJob(). The setupJob() runs once for the job before the map and reduce tasks are launched. It runs as a separate task in Hadoop 1.x and is run as part of the AM in 2.x. If there is more than 1 record, then the job would fail in the setup phase itself, thus avoiding doing listStatus and loading on all the map or reduce tasks. If it indeed was a scalar, we have an extra load in the setup phase but the cost is minimal as that should at max be 1 file or 1 file with data and other being empty files.

          Any other good ideas are welcome.

          Show
          Rohini Palaniswamy added a comment - It would be good to have a different syntax for scalar. But not sure how this can be done without breaking backward compatibility. The other solution that I could think of to mitigate the problem was to validate all the ReadScalars UDF in the plan by executing them in the OutputCommitter's setupJob(). The setupJob() runs once for the job before the map and reduce tasks are launched. It runs as a separate task in Hadoop 1.x and is run as part of the AM in 2.x. If there is more than 1 record, then the job would fail in the setup phase itself, thus avoiding doing listStatus and loading on all the map or reduce tasks. If it indeed was a scalar, we have an extra load in the setup phase but the cost is minimal as that should at max be 1 file or 1 file with data and other being empty files. Any other good ideas are welcome.

            People

            • Assignee:
              Unassigned
              Reporter:
              Rohini Palaniswamy
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:

                Development