Index: src/java/org/apache/hcatalog/pig/HCatLoader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/pig/HCatLoader.java (revision 1304455) +++ src/java/org/apache/hcatalog/pig/HCatLoader.java (revision ) @@ -37,6 +37,7 @@ import org.apache.pig.LoadFunc; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; import org.apache.pig.impl.util.UDFContext; /** @@ -175,6 +176,22 @@ // store this in the udf context so we can get it later storeInUDFContext(signature, PARTITION_FILTER, partitionFilterString); + } + + /** + * Get statistics about the data to be loaded. Only input data size is implemented at this time. + */ + @Override + public ResourceStatistics getStatistics(String location, Job job) throws IOException { + try { + ResourceStatistics stats = new ResourceStatistics(); + InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize( + job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); + stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024); + return stats; + } catch (Exception e) { + throw new IOException(e); + } } private String getPartitionFilterString() { Index: src/test/org/apache/hcatalog/pig/TestHCatLoader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/test/org/apache/hcatalog/pig/TestHCatLoader.java (revision 1304455) +++ src/test/org/apache/hcatalog/pig/TestHCatLoader.java (revision ) @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -34,10 +35,12 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapreduce.Job; import org.apache.hcatalog.HcatTestUtils; import org.apache.hcatalog.data.Pair; import org.apache.pig.ExecType; import org.apache.pig.PigServer; +import org.apache.pig.ResourceStatistics; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -53,9 +56,10 @@ private static final String BASIC_TABLE = "junit_unparted_basic"; private static final String COMPLEX_TABLE = "junit_unparted_complex"; private static final String PARTITIONED_TABLE = "junit_parted_basic"; + private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size"; private static Driver driver; - private static int guardTestCount = 5; // ugh, instantiate using introspection in guardedSetupBeforeClass + private static int guardTestCount = 6; // ugh, instantiate using introspection in guardedSetupBeforeClass private static boolean setupHasRun = false; private static Map> basicInputData; @@ -113,8 +117,8 @@ + "phnos array>"); createTable(PARTITIONED_TABLE,"a int, b string","bkt string"); + createTable(SPECIFIC_SIZE_TABLE, "a int, b string"); - int LOOP_SIZE = 3; String[] input = new String[LOOP_SIZE*LOOP_SIZE]; basicInputData = new HashMap>(); @@ -141,6 +145,7 @@ server.registerQuery("A = load '"+BASIC_FILE_NAME+"' as (a:int, b:chararray);"); server.registerQuery("store A into '"+BASIC_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();"); + server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hcatalog.pig.HCatStorer();"); server.registerQuery("B = foreach A generate a,b;"); server.registerQuery("B2 = filter B by a < 2;"); server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer('bkt=0');"); @@ -158,6 +163,7 @@ dropTable(BASIC_TABLE); dropTable(COMPLEX_TABLE); dropTable(PARTITIONED_TABLE); + dropTable(SPECIFIC_SIZE_TABLE); } protected void guardedTearDownAfterClass() throws Exception { @@ -375,5 +381,19 @@ numTuplesRead++; } assertEquals(basicInputData.size(),numTuplesRead); + } + + public void testGetInputBytes() throws Exception { + File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000"); + file.deleteOnExit(); + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + randomAccessFile.setLength(2L * 1024 * 1024 * 1024); + + Job job = new Job(); + HCatLoader hCatLoader = new HCatLoader(); + hCatLoader.setUDFContextSignature(this.getName()); + hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job); + ResourceStatistics statistics = hCatLoader.getStatistics(file.getAbsolutePath(), job); + assertEquals(2048, (long) statistics.getmBytes()); } } Index: src/java/org/apache/hcatalog/pig/HCatBaseLoader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- src/java/org/apache/hcatalog/pig/HCatBaseLoader.java (revision 1304455) +++ src/java/org/apache/hcatalog/pig/HCatBaseLoader.java (revision ) @@ -22,10 +22,15 @@ import java.util.List; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.PartInfo; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; import org.apache.pig.LoadPushDown; @@ -114,4 +119,31 @@ props.put(key, value); } + /** + * A utility method to get the size of inputs. This is accomplished by summing the + * size of all input paths on supported FileSystems. Locations whose size cannot be + * determined are ignored. Note non-FileSystem and unpartitioned locations will not + * report their input size by default. + */ + protected static long getSizeInBytes(InputJobInfo inputJobInfo) throws IOException { + Configuration conf = new Configuration(); + long sizeInBytes = 0; + + for (PartInfo partInfo : inputJobInfo.getPartitions()) { + try { + Path p = new Path(partInfo.getLocation()); + if (p.getFileSystem(conf).isFile(p)) { + sizeInBytes += p.getFileSystem(conf).getFileStatus(p).getLen(); + } else { + for (FileStatus child : p.getFileSystem(conf).listStatus(p)) { + sizeInBytes += child.getLen(); + } + } + } catch (IOException e) { + // Report size to the extent possible. + } + } + + return sizeInBytes; + } }