Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.8.1, 0.9.0, 0.10.0, 0.11
-
None
-
None
-
PIG 0.9 branch
HBase 0.90.3
HDFS 0.20-append
Description
I run a set of PIG jobs from a Java process (using PigServer). Most of which use HBaseStorage to load data from HBase.
Each job is run using a new PigServer object, and I correctly call PigServer.shutdown() when my pig server is no longer used.
Nevertheless, after a few hours of run, I notice that the number of connections to my Zookeeper servers reach the limit (300 in my case).
It appears that each job leaks 4 or 5 Zookeeper connections.
It was not the case with PIG 0.6.1 + HBase 0.20.6
To solve this issue (temporarily) by killing the process running PIG after a few set of jobs have been run : connections are correctly closed.
My process don't use HBase by itself, only HBaseStorage, so I guess the leak is in the code of HBaseStorage: maybe to cnx to HBase are not closed.
All my request are simple request loading data from HBase, lik:
pigServer.registerQuery("start_sessions = LOAD '" + Analytics.getHBaseTableURL("startSession") + "' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('meta:sid meta:infoid meta:imei meta:timestamp') " + "AS (sid:chararray, infoid:chararray, imei:chararray, start:long);"); pigServer.registerQuery("end_sessions = LOAD '" + Analytics.getHBaseTableURL("endSession") + "' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('meta:sid meta:timestamp meta:locid') " + "AS (sid:chararray, end:long, locid:chararray);"); pigServer.registerQuery("sessions = JOIN start_sessions BY sid, end_sessions BY sid;"); pigServer.store("sessions", Analytics.getOutputFilePath("sessions"), "BinStorage");
Code used to allocate a new PIG server:
public static PigServer getNewPigServer() throws IOException { /* Get system properties */ Properties properties = new Properties(); /* Set specific Hadoop properties for PIG jobs */ properties.setProperty("mapred.child.java.opts", "-Xmx" + childMemory + "m"); /* Create PIG context */ PigContext context = new PigContext(local ? ExecType.LOCAL : ExecType.MAPREDUCE, properties); /* Create the PIG server */ PigServer pigServer = new PigServer(context); /* Register our User Defined Functions (UDFs) */ pigServer.registerJar(pigUdfsPath); /* Register shortcuts for our UDFs */ pigServer.registerFunction("GetActivitiesLengthsRanges", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetActivitiesLengthsRanges")); pigServer.registerFunction("GetActivitiesLinks", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetActivitiesLinks")); pigServer.registerFunction("GetActivitiesPeriodsAndLengths", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetActivitiesPeriodsAndLengths")); pigServer.registerFunction("GetCountRange", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetCountRange")); pigServer.registerFunction("GetAllPeriods", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetAllPeriods")); pigServer.registerFunction("GetCountRangeLabel", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetCountRangeLabel")); pigServer.registerFunction("GetCountsAndLengthsByName", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetCountsAndLengthsByName")); pigServer.registerFunction("GetCountsByName", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetCountsByName")); pigServer.registerFunction("GetDayPeriod", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetDayPeriod")); pigServer.registerFunction("GetDayWeekMonthPeriods", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetDayWeekMonthPeriods")); pigServer.registerFunction("GetLengthRange", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetLengthRange")); pigServer.registerFunction("GetLengthRangeLabel", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetLengthRangeLabel")); pigServer.registerFunction("GetPeriods", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetPeriods")); pigServer.registerFunction("GetPeriodsAndLengths", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GetPeriodsAndLengths")); pigServer.registerFunction("NormalizeCarrierName", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.NormalizeCarrierName")); pigServer.registerFunction("NormalizeCountryCode", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.NormalizeCountryCode")); pigServer.registerFunction("NormalizeLocaleCode", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.NormalizeLocaleCode")); pigServer.registerFunction("NormalizeNetworkType", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.NormalizeNetworkType")); pigServer.registerFunction("NormalizeNetworkSubType", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.NormalizeNetworkSubType")); pigServer.registerFunction("NormalizePhoneManufacturer", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.NormalizePhoneManufacturer")); pigServer.registerFunction("NormalizePhoneModel", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.NormalizePhoneModel")); pigServer.registerFunction("NormalizeString", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.NormalizeString")); pigServer.registerFunction("SubString", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.SubString")); pigServer.registerFunction("GuessCountryCode", new FuncSpec( "com.ubikod.ermin.analytics.pigudf.GuessCountryCode")); /* Return this new instance of PIG server */ return pigServer; }
Code used when PIG server no longer used:
pigServer.shutdown();