Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-2251

PIG leaks Zookeeper connections when using HBaseStorage

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.8.1, 0.9.0, 0.10.0, 0.11
    • 0.12.0
    • None
    • None
    • PIG 0.9 branch
      HBase 0.90.3
      HDFS 0.20-append

    Description

      I run a set of PIG jobs from a Java process (using PigServer). Most of which use HBaseStorage to load data from HBase.
      Each job is run using a new PigServer object, and I correctly call PigServer.shutdown() when my pig server is no longer used.

      Nevertheless, after a few hours of run, I notice that the number of connections to my Zookeeper servers reach the limit (300 in my case).
      It appears that each job leaks 4 or 5 Zookeeper connections.

      It was not the case with PIG 0.6.1 + HBase 0.20.6

      To solve this issue (temporarily) by killing the process running PIG after a few set of jobs have been run : connections are correctly closed.
      My process don't use HBase by itself, only HBaseStorage, so I guess the leak is in the code of HBaseStorage: maybe to cnx to HBase are not closed.

      All my request are simple request loading data from HBase, lik:

          pigServer.registerQuery("start_sessions = LOAD '"
              + Analytics.getHBaseTableURL("startSession")
              + "' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('meta:sid meta:infoid meta:imei meta:timestamp') "
              + "AS (sid:chararray, infoid:chararray, imei:chararray, start:long);");
      
          pigServer.registerQuery("end_sessions = LOAD '"
              + Analytics.getHBaseTableURL("endSession")
              + "' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('meta:sid meta:timestamp meta:locid') "
              + "AS (sid:chararray, end:long, locid:chararray);");
      
          pigServer.registerQuery("sessions = JOIN start_sessions BY sid, end_sessions BY sid;");
      
          pigServer.store("sessions", Analytics.getOutputFilePath("sessions"), "BinStorage");
      
      
      

      Code used to allocate a new PIG server:

        public static PigServer getNewPigServer() throws IOException
        {
          /* Get system properties */
          Properties properties = new Properties();
      
          /* Set specific Hadoop properties for PIG jobs */
          properties.setProperty("mapred.child.java.opts", "-Xmx" + childMemory + "m");
      
          /* Create PIG context */
          PigContext context = new PigContext(local ? ExecType.LOCAL : ExecType.MAPREDUCE, properties);
      
          /* Create the PIG server */
          PigServer pigServer = new PigServer(context);
      
          /* Register our User Defined Functions (UDFs) */
          pigServer.registerJar(pigUdfsPath);
      
          /* Register shortcuts for our UDFs */
          pigServer.registerFunction("GetActivitiesLengthsRanges", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetActivitiesLengthsRanges"));
          pigServer.registerFunction("GetActivitiesLinks", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetActivitiesLinks"));
          pigServer.registerFunction("GetActivitiesPeriodsAndLengths", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetActivitiesPeriodsAndLengths"));
          pigServer.registerFunction("GetCountRange", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetCountRange"));
          pigServer.registerFunction("GetAllPeriods", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetAllPeriods"));
          pigServer.registerFunction("GetCountRangeLabel", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetCountRangeLabel"));
          pigServer.registerFunction("GetCountsAndLengthsByName", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetCountsAndLengthsByName"));
          pigServer.registerFunction("GetCountsByName", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetCountsByName"));
          pigServer.registerFunction("GetDayPeriod", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetDayPeriod"));
          pigServer.registerFunction("GetDayWeekMonthPeriods", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetDayWeekMonthPeriods"));
          pigServer.registerFunction("GetLengthRange", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetLengthRange"));
          pigServer.registerFunction("GetLengthRangeLabel", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetLengthRangeLabel"));
          pigServer.registerFunction("GetPeriods", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetPeriods"));
          pigServer.registerFunction("GetPeriodsAndLengths", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GetPeriodsAndLengths"));
          pigServer.registerFunction("NormalizeCarrierName", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.NormalizeCarrierName"));
          pigServer.registerFunction("NormalizeCountryCode", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.NormalizeCountryCode"));
          pigServer.registerFunction("NormalizeLocaleCode", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.NormalizeLocaleCode"));
          pigServer.registerFunction("NormalizeNetworkType", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.NormalizeNetworkType"));
          pigServer.registerFunction("NormalizeNetworkSubType", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.NormalizeNetworkSubType"));
          pigServer.registerFunction("NormalizePhoneManufacturer", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.NormalizePhoneManufacturer"));
          pigServer.registerFunction("NormalizePhoneModel", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.NormalizePhoneModel"));
          pigServer.registerFunction("NormalizeString", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.NormalizeString"));
          pigServer.registerFunction("SubString", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.SubString"));
          pigServer.registerFunction("GuessCountryCode", new FuncSpec(
            "com.ubikod.ermin.analytics.pigudf.GuessCountryCode"));
      
          /* Return this new instance of PIG server */
          return pigServer;
        }
      

      Code used when PIG server no longer used:

          pigServer.shutdown();
      

      Attachments

        1. PIG-2251.patch
          1 kB
          Jeff Markham

        Activity

          People

            jamarkha Jeff Markham
            vbarat Vincent BARAT
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: