Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1068610) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -80,6 +80,8 @@ import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.io.ContentSummayInputFormat; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -106,6 +108,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -1438,8 +1441,19 @@ ContentSummary cs = ctx.getCS(path); if (cs == null) { - FileSystem fs = p.getFileSystem(ctx.getConf()); - cs = fs.getContentSummary(p); + JobConf jobConf = new JobConf(ctx.getConf()); + PartitionDesc partDesc = work.getPathToPartitionInfo().get( + p.toString()); + Class inputFormatCls = partDesc + .getInputFileFormatClass(); + InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( + inputFormatCls, jobConf); + if(inputFormatObj instanceof ContentSummayInputFormat) { + cs = ((ContentSummayInputFormat) inputFormatObj).getContentSummary(p, jobConf); + } else { + FileSystem fs = p.getFileSystem(ctx.getConf()); + cs = fs.getContentSummary(p); + } ctx.addCS(path, cs); } Index: ql/src/java/org/apache/hadoop/hive/ql/io/ContentSummayInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/ContentSummayInputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/ContentSummayInputFormat.java (revision 0) @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; + +/** + * ContentSummayInputFormat provides an interface to let the input format itself + * figure the content summary for a give input path. + */ +public interface ContentSummayInputFormat { + + public ContentSummary getContentSummary(Path p, JobConf job) + throws IOException; + +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (revision 1068610) +++ ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (working copy) @@ -9,6 +9,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,7 +33,7 @@ */ @SuppressWarnings("deprecation") public class SymlinkTextInputFormat - implements InputFormat, JobConfigurable { + implements InputFormat, JobConfigurable, ContentSummayInputFormat { /** * This input split wraps the FileSplit generated from * TextInputFormat.getSplits(), while setting the original link file path @@ -181,4 +182,31 @@ public void validateInput(JobConf job) throws IOException { // do nothing } + + @Override + public ContentSummary getContentSummary(Path p, JobConf job) + throws IOException { + //length, file count, directory count + long[] summary = {0, 0, 0}; + List targetPaths = new ArrayList(); + List symlinkPaths = new ArrayList(); + try { + getTargetPathsFromSymlinksDirs( + job, + new Path[]{p}, + targetPaths, + symlinkPaths); + } catch (Exception e) { + throw new IOException( + "Error parsing symlinks from specified job input path.", e); + } + for(Path path : targetPaths) { + FileSystem fs = path.getFileSystem(job); + ContentSummary cs = fs.getContentSummary(path); + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); + } + return new ContentSummary(summary[0], summary[1], summary[2]); + } } Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (revision 1068610) +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (working copy) @@ -10,6 +10,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -64,31 +65,53 @@ */ public void testAccuracy1() throws IOException { // First data dir, contains 2 files. - writeTextFile(new Path(dataDir1, "file1"), + + FileSystem fs = dataDir1.getFileSystem(job); + int symbolLinkedFileSize = 0; + + Path dir1_file1 = new Path(dataDir1, "file1"); + writeTextFile(dir1_file1, "dir1_file1_line1\n" + "dir1_file1_line2\n"); - writeTextFile(new Path(dataDir1, "file2"), + + symbolLinkedFileSize += fs.getFileStatus(dir1_file1).getLen(); + + Path dir1_file2 = new Path(dataDir1, "file2"); + writeTextFile(dir1_file2, "dir1_file2_line1\n" + "dir1_file2_line2\n"); - + // Second data dir, contains 2 files. - writeTextFile(new Path(dataDir2, "file1"), + + Path dir2_file1 = new Path(dataDir2, "file1"); + writeTextFile(dir2_file1, "dir2_file1_line1\n" + "dir2_file1_line2\n"); - writeTextFile(new Path(dataDir2, "file2"), + + Path dir2_file2 = new Path(dataDir2, "file2"); + writeTextFile(dir2_file2, "dir2_file2_line1\n" + "dir2_file2_line2\n"); + symbolLinkedFileSize += fs.getFileStatus(dir2_file2).getLen(); + // A symlink file, contains first file from first dir and second file from // second dir. writeSymlinkFile( new Path(symlinkDir, "symlink_file"), new Path(dataDir1, "file1"), new Path(dataDir2, "file2")); + + SymlinkTextInputFormat inputFormat = new SymlinkTextInputFormat(); + + //test content summary + ContentSummary cs = inputFormat.getContentSummary(symlinkDir, job); + + assertEquals(symbolLinkedFileSize, cs.getLength()); + assertEquals(2, cs.getFileCount()); + assertEquals(0, cs.getDirectoryCount()); FileInputFormat.setInputPaths(job, symlinkDir); - - SymlinkTextInputFormat inputFormat = new SymlinkTextInputFormat(); InputSplit[] splits = inputFormat.getSplits(job, 2); log.info("Number of splits: " + splits.length); @@ -126,6 +149,13 @@ FileInputFormat.setInputPaths(job, symlinkDir); SymlinkTextInputFormat inputFormat = new SymlinkTextInputFormat(); + + ContentSummary cs = inputFormat.getContentSummary(symlinkDir, job); + + assertEquals(0, cs.getLength()); + assertEquals(0, cs.getFileCount()); + assertEquals(0, cs.getDirectoryCount()); + InputSplit[] splits = inputFormat.getSplits(job, 2); log.info("Number of splits: " + splits.length);