Index: build-common.xml =================================================================== --- build-common.xml (revision 899787) +++ build-common.xml (working copy) @@ -295,21 +295,25 @@ --> - - - + + + + + + + + - + + - - - + Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 899787) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -72,6 +72,7 @@ POSTEXECHOOKS("hive.exec.post.hooks", ""), EXECPARALLEL("hive.exec.parallel",false), // parallel query launching HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution",true), + HIVEFETCHFILENAMEFILTER("hive.exec.fetch.filename.filter", ""), // hadoop stuff HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"), Index: metastore/src/model/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java =================================================================== --- metastore/src/model/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java (revision 899787) +++ metastore/src/model/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java (working copy) @@ -24,6 +24,7 @@ public class MStorageDescriptor { private List cols; private String location; + private String filenameFilter; private String inputFormat; private String outputFormat; private boolean isCompressed = false; @@ -39,6 +40,7 @@ /** * @param cols * @param location + * @param filenameFilter * @param inputFormat * @param outputFormat * @param isCompressed @@ -48,11 +50,12 @@ * @param sortOrder * @param parameters */ - public MStorageDescriptor(List cols, String location, String inputFormat, + public MStorageDescriptor(List cols, String location, String filenameFilter, String inputFormat, String outputFormat, boolean isCompressed, int numBuckets, MSerDeInfo serDeInfo, List bucketCols, List sortOrder, Map parameters) { this.cols = cols; this.location = location; + this.filenameFilter = filenameFilter; this.inputFormat = inputFormat; this.outputFormat = outputFormat; this.isCompressed = isCompressed; @@ -77,8 +80,22 @@ public void setLocation(String location) { this.location = location; } + + /** + * @return the filename filter + */ + public String getFilenameFilter() { + return filenameFilter; + } /** + * @param filenameFilter the filename filter to set + */ + public void setFilenameFilter(String filenameFilter) { + this.filenameFilter = filenameFilter; + } + + /** * @return the isCompressed */ public boolean isCompressed() { Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 899787) +++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (working copy) @@ -657,6 +657,7 @@ return new StorageDescriptor( convertToFieldSchemas(msd.getCols()), msd.getLocation(), + msd.getFilenameFilter(), msd.getInputFormat(), msd.getOutputFormat(), msd.isCompressed(), @@ -672,6 +673,7 @@ return new MStorageDescriptor( convertToMFieldSchemas(sd.getCols()), sd.getLocation(), + sd.getFilenameFilter(), sd.getInputFormat(), sd.getOutputFormat(), sd.isCompressed(), Index: metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java =================================================================== --- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (revision 899787) +++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (working copy) @@ -141,7 +141,7 @@ * @return the Deserializer * @exception MetaException if any problems instantiating the Deserializer * - * todo - this should move somewhere into serde.jar + * TODO - this should move somewhere into serde.jar * */ static public Deserializer getDeserializer(Configuration conf, Properties schema) throws MetaException { @@ -167,7 +167,7 @@ * @return the Deserializer * @exception MetaException if any problems instantiating the Deserializer * - * todo - this should move somewhere into serde.jar + * TODO - this should move somewhere into serde.jar * */ static public Deserializer getDeserializer(Configuration conf, org.apache.hadoop.hive.metastore.api.Table table) throws MetaException { @@ -297,6 +297,7 @@ t.setSd(new StorageDescriptor()); t.setTableName(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME)); t.getSd().setLocation(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_LOCATION)); + t.getSd().setFilenameFilter(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_FILENAME_FILTER)); t.getSd().setInputFormat(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.FILE_INPUT_FORMAT, org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName())); t.getSd().setOutputFormat(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.FILE_OUTPUT_FORMAT, @@ -513,6 +514,9 @@ if(sd.getLocation() != null) { schema.setProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_LOCATION, sd.getLocation()); } + if (sd.getFilenameFilter() != null) { + schema.setProperty(Constants.META_TABLE_FILENAME_FILTER, sd.getFilenameFilter()); + } schema.setProperty(org.apache.hadoop.hive.metastore.api.Constants.BUCKET_COUNT, Integer.toString(sd.getNumBuckets())); if (sd.getBucketCols().size() > 0) { schema.setProperty(org.apache.hadoop.hive.metastore.api.Constants.BUCKET_FIELD_NAME, sd.getBucketCols().get(0)); Index: metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java =================================================================== --- metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java (revision 899787) +++ metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java (working copy) @@ -22,35 +22,38 @@ private static final TStruct STRUCT_DESC = new TStruct("StorageDescriptor"); private static final TField COLS_FIELD_DESC = new TField("cols", TType.LIST, (short)1); private static final TField LOCATION_FIELD_DESC = new TField("location", TType.STRING, (short)2); - private static final TField INPUT_FORMAT_FIELD_DESC = new TField("inputFormat", TType.STRING, (short)3); - private static final TField OUTPUT_FORMAT_FIELD_DESC = new TField("outputFormat", TType.STRING, (short)4); - private static final TField COMPRESSED_FIELD_DESC = new TField("compressed", TType.BOOL, (short)5); - private static final TField NUM_BUCKETS_FIELD_DESC = new TField("numBuckets", TType.I32, (short)6); - private static final TField SERDE_INFO_FIELD_DESC = new TField("serdeInfo", TType.STRUCT, (short)7); - private static final TField BUCKET_COLS_FIELD_DESC = new TField("bucketCols", TType.LIST, (short)8); - private static final TField SORT_COLS_FIELD_DESC = new TField("sortCols", TType.LIST, (short)9); - private static final TField PARAMETERS_FIELD_DESC = new TField("parameters", TType.MAP, (short)10); + private static final TField FILENAME_FILTER_FIELD_DESC = new TField("filenameFilter", TType.STRING, (short)3); + private static final TField INPUT_FORMAT_FIELD_DESC = new TField("inputFormat", TType.STRING, (short)4); + private static final TField OUTPUT_FORMAT_FIELD_DESC = new TField("outputFormat", TType.STRING, (short)5); + private static final TField COMPRESSED_FIELD_DESC = new TField("compressed", TType.BOOL, (short)6); + private static final TField NUM_BUCKETS_FIELD_DESC = new TField("numBuckets", TType.I32, (short)7); + private static final TField SERDE_INFO_FIELD_DESC = new TField("serdeInfo", TType.STRUCT, (short)8); + private static final TField BUCKET_COLS_FIELD_DESC = new TField("bucketCols", TType.LIST, (short)9); + private static final TField SORT_COLS_FIELD_DESC = new TField("sortCols", TType.LIST, (short)10); + private static final TField PARAMETERS_FIELD_DESC = new TField("parameters", TType.MAP, (short)11); private List cols; public static final int COLS = 1; private String location; public static final int LOCATION = 2; + private String filenameFilter; + public static final int FILENAMEFILTER = 3; private String inputFormat; - public static final int INPUTFORMAT = 3; + public static final int INPUTFORMAT = 4; private String outputFormat; - public static final int OUTPUTFORMAT = 4; + public static final int OUTPUTFORMAT = 5; private boolean compressed; - public static final int COMPRESSED = 5; + public static final int COMPRESSED = 6; private int numBuckets; - public static final int NUMBUCKETS = 6; + public static final int NUMBUCKETS = 7; private SerDeInfo serdeInfo; - public static final int SERDEINFO = 7; + public static final int SERDEINFO = 8; private List bucketCols; - public static final int BUCKETCOLS = 8; + public static final int BUCKETCOLS = 9; private List sortCols; - public static final int SORTCOLS = 9; + public static final int SORTCOLS = 10; private Map parameters; - public static final int PARAMETERS = 10; + public static final int PARAMETERS = 11; private final Isset __isset = new Isset(); private static final class Isset implements java.io.Serializable { @@ -64,6 +67,8 @@ new StructMetaData(TType.STRUCT, FieldSchema.class)))); put(LOCATION, new FieldMetaData("location", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.STRING))); + put(FILENAMEFILTER, new FieldMetaData("filenameFilter", TFieldRequirementType.DEFAULT, + new FieldValueMetaData(TType.STRING))); put(INPUTFORMAT, new FieldMetaData("inputFormat", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.STRING))); put(OUTPUTFORMAT, new FieldMetaData("outputFormat", TFieldRequirementType.DEFAULT, @@ -96,6 +101,7 @@ public StorageDescriptor( List cols, String location, + String filenameFilter, String inputFormat, String outputFormat, boolean compressed, @@ -108,6 +114,7 @@ this(); this.cols = cols; this.location = location; + this.filenameFilter = filenameFilter; this.inputFormat = inputFormat; this.outputFormat = outputFormat; this.compressed = compressed; @@ -134,6 +141,9 @@ if (other.isSetLocation()) { this.location = other.location; } + if (other.isSetFilenameFilter()) { + this.filenameFilter = other.filenameFilter; + } if (other.isSetInputFormat()) { this.inputFormat = other.inputFormat; } @@ -232,6 +242,23 @@ return this.location != null; } + public String getFilenameFilter() { + return this.filenameFilter; + } + + public void setFilenameFilter(String filenameFilter) { + this.filenameFilter = filenameFilter; + } + + public void unsetFilenameFilter() { + this.filenameFilter = null; + } + + // Returns true if field filenameFilter is set (has been asigned a value) and false otherwise + public boolean isSetFilenameFilter() { + return this.filenameFilter != null; + } + public String getInputFormat() { return this.inputFormat; } @@ -429,6 +456,14 @@ } break; + case FILENAMEFILTER: + if (value == null) { + unsetFilenameFilter(); + } else { + setFilenameFilter((String)value); + } + break; + case INPUTFORMAT: if (value == null) { unsetInputFormat(); @@ -506,6 +541,9 @@ case LOCATION: return getLocation(); + case FILENAMEFILTER: + return getFilenameFilter(); + case INPUTFORMAT: return getInputFormat(); @@ -542,6 +580,8 @@ return isSetCols(); case LOCATION: return isSetLocation(); + case FILENAMEFILTER: + return isSetFilenameFilter(); case INPUTFORMAT: return isSetInputFormat(); case OUTPUTFORMAT: @@ -594,6 +634,15 @@ return false; } + boolean this_present_filenameFilter = true && this.isSetFilenameFilter(); + boolean that_present_filenameFilter = true && that.isSetFilenameFilter(); + if (this_present_filenameFilter || that_present_filenameFilter) { + if (!(this_present_filenameFilter && that_present_filenameFilter)) + return false; + if (!this.filenameFilter.equals(that.filenameFilter)) + return false; + } + boolean this_present_inputFormat = true && this.isSetInputFormat(); boolean that_present_inputFormat = true && that.isSetInputFormat(); if (this_present_inputFormat || that_present_inputFormat) { @@ -710,6 +759,13 @@ TProtocolUtil.skip(iprot, field.type); } break; + case FILENAMEFILTER: + if (field.type == TType.STRING) { + this.filenameFilter = iprot.readString(); + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; case INPUTFORMAT: if (field.type == TType.STRING) { this.inputFormat = iprot.readString(); @@ -833,6 +889,11 @@ oprot.writeString(this.location); oprot.writeFieldEnd(); } + if (this.filenameFilter != null) { + oprot.writeFieldBegin(FILENAME_FILTER_FIELD_DESC); + oprot.writeString(this.filenameFilter); + oprot.writeFieldEnd(); + } if (this.inputFormat != null) { oprot.writeFieldBegin(INPUT_FORMAT_FIELD_DESC); oprot.writeString(this.inputFormat); @@ -913,6 +974,14 @@ } first = false; if (!first) sb.append(", "); + sb.append("filenameFilter:"); + if (this.filenameFilter == null) { + sb.append("null"); + } else { + sb.append(this.filenameFilter); + } + first = false; + if (!first) sb.append(", "); sb.append("inputFormat:"); if (this.inputFormat == null) { sb.append("null"); Index: metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Constants.java =================================================================== --- metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Constants.java (revision 899787) +++ metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Constants.java (working copy) @@ -34,6 +34,8 @@ public static final String META_TABLE_LOCATION = "location"; + public static final String META_TABLE_FILENAME_FILTER = "filename_filter"; + public static final String META_TABLE_SERDE = "serde"; public static final String META_TABLE_PARTITION_COLUMNS = "partition_columns"; Index: metastore/if/hive_metastore.thrift =================================================================== --- metastore/if/hive_metastore.thrift (revision 899787) +++ metastore/if/hive_metastore.thrift (working copy) @@ -52,14 +52,15 @@ struct StorageDescriptor { 1: list cols, // required (refer to types defined above) 2: string location, // defaults to //tablename - 3: string inputFormat, // SequenceFileInputFormat (binary) or TextInputFormat` or custom format - 4: string outputFormat, // SequenceFileOutputFormat (binary) or IgnoreKeyTextOutputFormat or custom format - 5: bool compressed, // compressed or not - 6: i32 numBuckets, // this must be specified if there are any dimension columns - 7: SerDeInfo serdeInfo, // serialization and deserialization information - 8: list bucketCols, // reducer grouping columns and clustering columns and bucketing columns` - 9: list sortCols, // sort order of the data in each bucket - 10: map parameters // any user supplied key value hash + 3: string filenameFilter, // filename regular expression filter + 4: string inputFormat, // SequenceFileInputFormat (binary) or TextInputFormat` or custom format + 5: string outputFormat, // SequenceFileOutputFormat (binary) or IgnoreKeyTextOutputFormat or custom format + 6: bool compressed, // compressed or not + 7: i32 numBuckets, // this must be specified if there are any dimension columns + 8: SerDeInfo serdeInfo, // serialization and deserialization information + 9: list bucketCols, // reducer grouping columns and clustering columns and bucketing columns` + 10: list sortCols, // sort order of the data in each bucket + 11: map parameters // any user supplied key value hash } // table information @@ -224,6 +225,7 @@ const string META_TABLE_NAME = "name", const string META_TABLE_DB = "db", const string META_TABLE_LOCATION = "location", +const string META_TABLE_FILENAME_FILTER = "filename_filter", const string META_TABLE_SERDE = "serde", const string META_TABLE_PARTITION_COLUMNS = "partition_columns", const string FILE_INPUT_FORMAT = "file.inputformat", Index: data/files/ext_regex/a_regex.txt =================================================================== --- data/files/ext_regex/a_regex.txt (revision 0) +++ data/files/ext_regex/a_regex.txt (revision 0) @@ -0,0 +1,10 @@ +1|a_regex.txt|Lowell +2|a_regex.txt|Worcester +3|a_regex.txt|Roxbury +4|a_regex.txt|Dedham +5|a_regex.txt|Mattappan +6|a_regex.txt|Leominster +7|a_regex.txt|Shirley +8|a_regex.txt|Quincy +9|a_regex.txt|Brookline +10|a_regex.txt|Brockton Index: data/files/ext_regex/b_regex.txt =================================================================== --- data/files/ext_regex/b_regex.txt (revision 0) +++ data/files/ext_regex/b_regex.txt (revision 0) @@ -0,0 +1,10 @@ +1|b_regex.txt|Lowell +2|b_regex.txt|Worcester +3|b_regex.txt|Roxbury +4|b_regex.txt|Dedham +5|b_regex.txt|Mattappan +6|b_regex.txt|Leominster +7|b_regex.txt|Shirley +8|b_regex.txt|Quincy +9|b_regex.txt|Brookline +10|b_regex.txt|Brockton Index: data/files/ext_regex/c_regex.txt =================================================================== --- data/files/ext_regex/c_regex.txt (revision 0) +++ data/files/ext_regex/c_regex.txt (revision 0) @@ -0,0 +1,10 @@ +1|c_regex.txt|Lowell +2|c_regex.txt|Worcester +3|c_regex.txt|Roxbury +4|c_regex.txt|Dedham +5|c_regex.txt|Mattappan +6|c_regex.txt|Leominster +7|c_regex.txt|Shirley +8|c_regex.txt|Quincy +9|c_regex.txt|Brookline +10|c_regex.txt|Brockton Index: ql/src/test/results/clientpositive/external_filename_filter.q.out =================================================================== --- ql/src/test/results/clientpositive/external_filename_filter.q.out (revision 0) +++ ql/src/test/results/clientpositive/external_filename_filter.q.out (revision 0) @@ -0,0 +1,306 @@ +PREHOOK: query: DROP TABLE tab_abc +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE tab_abc +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE tab_abc ( + id INT, + file STRING, + name STRING + ) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +STORED AS TEXTFILE +LOCATION "file:////Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex" +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE tab_abc ( + id INT, + file STRING, + name STRING + ) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +STORED AS TEXTFILE +LOCATION "file:////Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex" +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tab_abc +PREHOOK: query: DROP TABLE tab_a +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE tab_a +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE tab_a ( + id INT, + file STRING, + name STRING + ) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +STORED AS TEXTFILE +LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex" +FILENAME_FILTER '^a.*' +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE tab_a ( + id INT, + file STRING, + name STRING + ) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +STORED AS TEXTFILE +LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex" +FILENAME_FILTER '^a.*' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tab_a +PREHOOK: query: DROP TABLE tab_b +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE tab_b +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE tab_b ( + id INT, + file STRING, + name STRING + ) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +STORED AS TEXTFILE +LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex" +FILENAME_FILTER '^b.*' +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE tab_b ( + id INT, + file STRING, + name STRING + ) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +STORED AS TEXTFILE +LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex" +FILENAME_FILTER '^b.*' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tab_b +PREHOOK: query: DROP TABLE tab_c +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE tab_c +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE tab_c ( + id INT, + file STRING, + name STRING + ) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +STORED AS TEXTFILE +LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex" +FILENAME_FILTER '^c.*' +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE tab_c ( + id INT, + file STRING, + name STRING + ) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +STORED AS TEXTFILE +LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex" +FILENAME_FILTER '^c.*' +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tab_c +PREHOOK: query: SELECT id, file, name from tab_abc +PREHOOK: type: QUERY +PREHOOK: Input: default@tab_abc +PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/182053365/10000 +POSTHOOK: query: SELECT id, file, name from tab_abc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab_abc +POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/182053365/10000 +1 a_regex.txt Lowell +2 a_regex.txt Worcester +3 a_regex.txt Roxbury +4 a_regex.txt Dedham +5 a_regex.txt Mattappan +6 a_regex.txt Leominster +7 a_regex.txt Shirley +8 a_regex.txt Quincy +9 a_regex.txt Brookline +10 a_regex.txt Brockton +1 b_regex.txt Lowell +2 b_regex.txt Worcester +3 b_regex.txt Roxbury +4 b_regex.txt Dedham +5 b_regex.txt Mattappan +6 b_regex.txt Leominster +7 b_regex.txt Shirley +8 b_regex.txt Quincy +9 b_regex.txt Brookline +10 b_regex.txt Brockton +1 c_regex.txt Lowell +2 c_regex.txt Worcester +3 c_regex.txt Roxbury +4 c_regex.txt Dedham +5 c_regex.txt Mattappan +6 c_regex.txt Leominster +7 c_regex.txt Shirley +8 c_regex.txt Quincy +9 c_regex.txt Brookline +10 c_regex.txt Brockton +PREHOOK: query: SELECT * FROM tab_abc +PREHOOK: type: QUERY +PREHOOK: Input: default@tab_abc +PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/128367836/10000 +POSTHOOK: query: SELECT * FROM tab_abc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab_abc +POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/128367836/10000 +1 a_regex.txt Lowell +2 a_regex.txt Worcester +3 a_regex.txt Roxbury +4 a_regex.txt Dedham +5 a_regex.txt Mattappan +6 a_regex.txt Leominster +7 a_regex.txt Shirley +8 a_regex.txt Quincy +9 a_regex.txt Brookline +10 a_regex.txt Brockton +1 b_regex.txt Lowell +2 b_regex.txt Worcester +3 b_regex.txt Roxbury +4 b_regex.txt Dedham +5 b_regex.txt Mattappan +6 b_regex.txt Leominster +7 b_regex.txt Shirley +8 b_regex.txt Quincy +9 b_regex.txt Brookline +10 b_regex.txt Brockton +1 c_regex.txt Lowell +2 c_regex.txt Worcester +3 c_regex.txt Roxbury +4 c_regex.txt Dedham +5 c_regex.txt Mattappan +6 c_regex.txt Leominster +7 c_regex.txt Shirley +8 c_regex.txt Quincy +9 c_regex.txt Brookline +10 c_regex.txt Brockton +PREHOOK: query: SELECT id, file, name from tab_a +PREHOOK: type: QUERY +PREHOOK: Input: default@tab_a +PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1897634501/10000 +POSTHOOK: query: SELECT id, file, name from tab_a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab_a +POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1897634501/10000 +1 a_regex.txt Lowell +2 a_regex.txt Worcester +3 a_regex.txt Roxbury +4 a_regex.txt Dedham +5 a_regex.txt Mattappan +6 a_regex.txt Leominster +7 a_regex.txt Shirley +8 a_regex.txt Quincy +9 a_regex.txt Brookline +10 a_regex.txt Brockton +PREHOOK: query: SELECT * from tab_a +PREHOOK: type: QUERY +PREHOOK: Input: default@tab_a +PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1968978404/10000 +POSTHOOK: query: SELECT * from tab_a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab_a +POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1968978404/10000 +1 a_regex.txt Lowell +2 a_regex.txt Worcester +3 a_regex.txt Roxbury +4 a_regex.txt Dedham +5 a_regex.txt Mattappan +6 a_regex.txt Leominster +7 a_regex.txt Shirley +8 a_regex.txt Quincy +9 a_regex.txt Brookline +10 a_regex.txt Brockton +PREHOOK: query: SELECT id, file, name from tab_b +PREHOOK: type: QUERY +PREHOOK: Input: default@tab_b +PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1919070567/10000 +POSTHOOK: query: SELECT id, file, name from tab_b +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab_b +POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1919070567/10000 +1 b_regex.txt Lowell +2 b_regex.txt Worcester +3 b_regex.txt Roxbury +4 b_regex.txt Dedham +5 b_regex.txt Mattappan +6 b_regex.txt Leominster +7 b_regex.txt Shirley +8 b_regex.txt Quincy +9 b_regex.txt Brookline +10 b_regex.txt Brockton +PREHOOK: query: SELECT * from tab_b +PREHOOK: type: QUERY +PREHOOK: Input: default@tab_b +PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/348355769/10000 +POSTHOOK: query: SELECT * from tab_b +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab_b +POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/348355769/10000 +1 b_regex.txt Lowell +2 b_regex.txt Worcester +3 b_regex.txt Roxbury +4 b_regex.txt Dedham +5 b_regex.txt Mattappan +6 b_regex.txt Leominster +7 b_regex.txt Shirley +8 b_regex.txt Quincy +9 b_regex.txt Brookline +10 b_regex.txt Brockton +PREHOOK: query: SELECT a.id, a.file, b.file, a.name +FROM tab_a a JOIN tab_b b +ON a.id = b.id +PREHOOK: type: QUERY +PREHOOK: Input: default@tab_b +PREHOOK: Input: default@tab_a +PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1303397111/10000 +POSTHOOK: query: SELECT a.id, a.file, b.file, a.name +FROM tab_a a JOIN tab_b b +ON a.id = b.id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab_b +POSTHOOK: Input: default@tab_a +POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1303397111/10000 +1 a_regex.txt b_regex.txt Lowell +2 a_regex.txt b_regex.txt Worcester +3 a_regex.txt b_regex.txt Roxbury +4 a_regex.txt b_regex.txt Dedham +5 a_regex.txt b_regex.txt Mattappan +6 a_regex.txt b_regex.txt Leominster +7 a_regex.txt b_regex.txt Shirley +8 a_regex.txt b_regex.txt Quincy +9 a_regex.txt b_regex.txt Brookline +10 a_regex.txt b_regex.txt Brockton +PREHOOK: query: DROP TABLE tab_abc +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE tab_abc +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@tab_abc +PREHOOK: query: DROP TABLE tab_a +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE tab_a +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@tab_a +PREHOOK: query: DROP TABLE tab_b +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE tab_b +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@tab_b +PREHOOK: query: DROP TABLE tab_c +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE tab_c +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: default@tab_c Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 899787) +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,11 +39,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.cli.CliDriver; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; @@ -93,6 +92,42 @@ private HadoopShims.MiniDFSShim dfs = null; private boolean miniMr = false; + private static final Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}"); + private static final int MAX_SUBST = 20; + + // adapted from o.a.h.conf.Configuration + public static String substituteVars(Properties props, String expr) { + + if (expr == null) { + return null; + } + Matcher match = varPat.matcher(""); + String eval = expr; + for(int s=0; s> inputFormatClass, - Class outputFormatClass, - URI dataLocation, Hive hive) throws HiveException { - initEmpty(); - this.schema = schema; - this.deserializer = deserializer; //TODO: convert to SerDeInfo format - this.getTTable().getSd().getSerdeInfo().setSerializationLib(deserializer.getClass().getName()); - getTTable().setTableName(name); - getSerdeInfo().setSerializationLib(deserializer.getClass().getName()); - setInputFormatClass(inputFormatClass); - setOutputFormatClass(HiveFileFormatUtils.getOutputFormatSubstitute(outputFormatClass)); - setDataLocation(dataLocation); - } - public Table(String name) { // fill in defaults initEmpty(); @@ -217,6 +190,10 @@ final public Path getPath() { return new Path(getTTable().getSd().getLocation()); } + + final public String getFilenameFilter() { + return getTTable().getSd().getFilenameFilter(); + } final public String getName() { return getTTable().getTableName(); @@ -377,6 +354,10 @@ uri = uri2; getTTable().getSd().setLocation(uri2.toString()); } + + public void setFilenameFilter(String filenameFilter) { + getTTable().getSd().setFilenameFilter(filenameFilter); + } public void setBucketCols(List bucketCols) throws HiveException { if (bucketCols == null) { Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy) @@ -391,6 +391,7 @@ true, JavaUtils.getClassLoader())); table.setDeserializer(MetaStoreUtils.getDeserializer(getConf(), p)); table.setDataLocation(new URI(tTable.getSd().getLocation())); + table.setFilenameFilter(tTable.getSd().getFilenameFilter()); } catch(Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.Constants; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -396,6 +397,7 @@ Path tblDir = null; tableDesc tblDesc = null; + String tblFilenameFilter = null; PrunedPartitionList partsList = null; @@ -460,6 +462,7 @@ tblDir = paths[0]; tblDesc = Utilities.getTableDesc(part.getTable()); + tblFilenameFilter = tblDesc.getProperties().getProperty(Constants.META_TABLE_FILENAME_FILTER); } for (Path p: paths) { @@ -494,6 +497,11 @@ } plan.getPathToAliases().get(path).add(alias_id); plan.getPathToPartitionInfo().put(path, prtDesc); + + if (tblFilenameFilter != null) { + plan.setFilenameFilterForAlias(alias_id, tblFilenameFilter); + } + LOG.debug("Information added for path " + path); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (working copy) @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Properties; import java.util.Map.Entry; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -34,7 +35,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.mapredWork; import org.apache.hadoop.hive.ql.plan.partitionDesc; -import org.apache.hadoop.hive.ql.plan.tableDesc; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -45,13 +45,16 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; + /** * Map operator. This triggers overall map side processing. * This is a little different from regular operators in that * it starts off by processing a Writable data structure from * a Table (instead of a Hive Object). **/ -public class MapOperator extends Operator implements Serializable { +public class MapOperator extends Operator implements Serializable { private static final long serialVersionUID = 1L; public static enum Counter {DESERIALIZE_ERRORS} @@ -61,18 +64,40 @@ transient private Object[] rowWithPart; transient private StructObjectInspector rowObjectInspector; transient private boolean isPartitioned; - private Map opCtxMap; + + private Map opCtxMap = + new HashMap(); - private Map, java.util.ArrayList> operatorToPaths; + private Map, List> operatorToPaths = + new HashMap, List>(); + + private List childrenPaths = new ArrayList(); - private java.util.ArrayList childrenPaths = new ArrayList(); + private List> extraChildrenToClose = + new ArrayList>(); - private ArrayList> extraChildrenToClose = null; - + private void addOperatorPath(Operator operator, String path) { + List pathList = operatorToPaths.get(operator); + if (null == pathList) { + pathList = new ArrayList(); + operatorToPaths.put(operator, pathList); + } + pathList.add(path); + } + + private List getOperatorPaths(Operator operator) { + List paths = operatorToPaths.get(operator); + if (null == paths) { + return emptyList(); + } else { + return unmodifiableList(paths); + } + } + private static class MapInputPath { - String path; - String alias; - Operator op; + private String path; + private String alias; + private Operator op; /** * @param path @@ -171,7 +196,7 @@ LinkedHashMap partSpec = td.getPartSpec(); Properties tblProps = td.getProperties(); - Class sdclass = td.getDeserializerClass(); + Class sdclass = td.getDeserializerClass(); if(sdclass == null) { String className = td.getSerdeClassName(); if ((className == "") || (className == null)) { @@ -233,39 +258,42 @@ } public void setChildren(Configuration hconf) throws HiveException { - - Path fpath = new Path((new Path(HiveConf.getVar(hconf, - HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath()); - ArrayList> children = + String mapFilename = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME); + Path mapFilePath = new Path((new Path(mapFilename)).toUri().getPath()); + List> childOps = new ArrayList>(); - opCtxMap = new HashMap(); - operatorToPaths = new HashMap, java.util.ArrayList> (); - + statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count); try { boolean done = false; - for (String onefile : conf.getPathToAliases().keySet()) { - MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile); - Path onepath = new Path(new Path(onefile).toUri().getPath()); - List aliases = conf.getPathToAliases().get(onefile); - for (String onealias : aliases) { - Operator op = conf.getAliasToWork().get( - onealias); - LOG.info("Adding alias " + onealias + " to work list for file " - + fpath.toUri().getPath()); - MapInputPath inp = new MapInputPath(onefile, onealias, op); + for (String dirPath : conf.getPaths()) { + MapOpCtx opCtx = initObjectInspector(conf, hconf, dirPath); + + for (String tabAlias : conf.getAliasesForPath(dirPath)) { + // Skip file if it doesn't match the filter + String fileNameFilter = conf.getFileNameFilterForAlias(tabAlias); + if (null != fileNameFilter) { + if (!Pattern.compile(fileNameFilter).matcher(mapFilePath.getName()).matches()) { + continue; + } + } + + Operator op = conf.getOperatorForAlias(tabAlias); + LOG.info("Adding alias " + tabAlias + " to work list for file " + mapFilePath.toUri().getPath()); + + MapInputPath inp = new MapInputPath(dirPath, tabAlias, op); opCtxMap.put(inp, opCtx); - if(operatorToPaths.get(op) == null) - operatorToPaths.put(op, new java.util.ArrayList()); - operatorToPaths.get(op).add(onefile); + addOperatorPath(op, dirPath); op.setParentOperators(new ArrayList>()); op.getParentOperators().add(this); + // check for the operators who will process rows coming to this Map Operator - if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { - children.add(op); - childrenPaths.add(onefile); + Path filePath = new Path(new Path(dirPath).toUri().getPath()); + if (!filePath.toUri().relativize(mapFilePath.toUri()).equals(mapFilePath.toUri())) { + childOps.add(op); + childrenPaths.add(dirPath); LOG.info("dump " + op.getName() + " " + opCtxMap.get(inp).getRowObjectInspector().getTypeName()); if (!done) { deserializer = opCtxMap.get(inp).getDeserializer(); @@ -277,16 +305,16 @@ } } } - if (children.size() == 0) { + if (childOps.size() == 0) { // didn't find match for input file path in configuration! // serious problem .. LOG.error("Configuration does not have any alias for path: " - + fpath.toUri().getPath()); + + mapFilePath.toUri().getPath()); throw new HiveException("Configuration and input path are inconsistent"); } // we found all the operators that we are supposed to process. - setChildOperators(children); + setChildOperators(childOps); } catch (Exception e) { throw new HiveException(e); } @@ -307,9 +335,6 @@ Operator op = input.op; // op is not in the children list, so need to remember it and close it afterwards if ( children.indexOf(op) == -1 ) { - if ( extraChildrenToClose == null ) { - extraChildrenToClose = new ArrayList>(); - } extraChildrenToClose.add(op); } @@ -317,8 +342,7 @@ // below logic is to avoid initialize one operator multiple times if there // is one input path in this mapper's input paths. boolean shouldInit = true; - List paths = operatorToPaths.get(op); - for(String path: paths) { + for(String path : getOperatorPaths(op)) { if(childrenPaths.contains(path) && !path.equals(input.path)) { shouldInit = false; break; @@ -333,10 +357,8 @@ * close extra child operators that are initialized but are not executed. */ public void closeOp(boolean abort) throws HiveException { - if ( extraChildrenToClose != null ) { - for (Operator op : extraChildrenToClose) { - op.close(abort); - } + for (Operator op : extraChildrenToClose) { + op.close(abort); } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredInputFilter.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredInputFilter.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredInputFilter.java (revision 0) @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.ql.plan.mapredWork; + +public class MapredInputFilter implements PathFilter, Configurable { + private Configuration conf; + private mapredWork mrWork; + + @Override + public boolean accept(Path inFilePath) { + String filename = inFilePath.getName(); + Path tblPath = inFilePath.getParent(); + + for (String alias : mrWork.getAliasesForPath(tblPath)) { + String filter = mrWork.getFileNameFilterForAlias(alias); + if (null == filter || Pattern.compile(filter).matcher(filename).matches()) { + return true; + } + } + return false; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + mrWork = Utilities.getMapRedWork(conf); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFilenameFilter.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFilenameFilter.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFilenameFilter.java (revision 0) @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +public class FetchFilenameFilter extends FilenameFilter implements Configurable { + private Configuration conf; + + public FetchFilenameFilter() { + // zero-arg constructor to keep reflection happy + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + String filter = conf.get(HiveConf.ConfVars.HIVEFETCHFILENAMEFILTER.varname); + if (filter != null) { + addFilter(filter); + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FilenameFilter.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FilenameFilter.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FilenameFilter.java (revision 0) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + + +public class FilenameFilter implements PathFilter { + + private List filters = new ArrayList(); + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + @Override + public boolean accept(Path filePath) { + String filename = filePath.getName(); + // Path fileDir = filePath.getParent(); + + for (String filter : filters) { + if (Pattern.compile(filter).matcher(filename).matches()) { + LOG.info("Accepting " + filePath.toString()); + return true; + } + } + LOG.info("Rejecting " + filePath.toString()); + return false; + } + + protected void addFilter(String filter) { + filters.add(filter); + } + + protected List getFilters() { + return filters; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -23,14 +23,16 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Constants; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.fetchWork; import org.apache.hadoop.hive.ql.plan.partitionDesc; @@ -41,7 +43,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -54,6 +55,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import static org.apache.hadoop.util.StringUtils.stringifyException; +import static org.apache.hadoop.util.StringUtils.escapeString; + /** * FetchTask implementation **/ @@ -69,6 +73,10 @@ this.work = work; this.job = job; + if (work.getFilenameFilter() != null) { + job.set(HiveConf.ConfVars.HIVEFETCHFILENAMEFILTER.varname, work.getFilenameFilter()); + } + currRecReader = null; currPath = null; currTbl = null; @@ -81,11 +89,11 @@ private fetchWork work; private int splitNum; - private RecordReader currRecReader; + private RecordReader, Writable> currRecReader; + private FileInputFormat, Writable> inputFormat; private InputSplit[] inputSplits; - private InputFormat inputFormat; private JobConf job; - private WritableComparable key; + private WritableComparable key; private Writable value; private Deserializer serde; private Iterator iterPath; @@ -100,21 +108,23 @@ /** * A cache of InputFormat instances. */ - private static Map> inputFormats = new HashMap>(); + private static Map, FileInputFormat, Writable>> inputFormats = + new HashMap, FileInputFormat, Writable>>(); - static InputFormat getInputFormatFromCache( - Class inputFormatClass, Configuration conf) throws IOException { - if (!inputFormats.containsKey(inputFormatClass)) { + static FileInputFormat, Writable> + getInputFormatFromCache(Class inputFormatClass, Configuration conf) throws IOException { + FileInputFormat, Writable> inputFormat = inputFormats.get(inputFormatClass); + if (inputFormat == null) { try { - InputFormat newInstance = (InputFormat) ReflectionUtils - .newInstance(inputFormatClass, conf); - inputFormats.put(inputFormatClass, newInstance); + inputFormat = + (FileInputFormat, Writable>) ReflectionUtils.newInstance(inputFormatClass, conf); + inputFormats.put(inputFormatClass, inputFormat); } catch (Exception e) { throw new IOException("Cannot create an instance of InputFormat class " + inputFormatClass.getName() + " as specified in mapredWork!"); } } - return inputFormats.get(inputFormatClass); + return inputFormat; } private void setPrtnDesc() throws Exception { @@ -124,9 +134,8 @@ String pcols = currPart .getTableDesc() .getProperties() - .getProperty( - org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS); - LinkedHashMap partSpec = currPart.getPartSpec(); + .getProperty(Constants.META_TABLE_PARTITION_COLUMNS); + Map partSpec = currPart.getPartSpec(); List partObjectInspectors = new ArrayList(); String[] partKeys = pcols.trim().split("/"); @@ -147,6 +156,7 @@ } private void getNextPath() throws Exception { + // first time if (iterPath == null) { if (work.getTblDir() != null) { @@ -155,8 +165,7 @@ currTbl = work.getTblDesc(); FileSystem fs = currPath.getFileSystem(job); if (fs.exists(currPath)) { - FileStatus[] fStats = fs.listStatus(currPath); - for (FileStatus fStat : fStats) { + for (FileStatus fStat : fs.listStatus(currPath)) { if (fStat.getLen() > 0) { tblDataDone = true; break; @@ -183,20 +192,19 @@ partitionDesc prt = iterPartDesc.next(); FileSystem fs = nxt.getFileSystem(job); if (fs.exists(nxt)) { - FileStatus[] fStats = fs.listStatus(nxt); - for (FileStatus fStat : fStats) { + for (FileStatus fStat : fs.listStatus(nxt)) { if (fStat.getLen() > 0) { currPath = nxt; currPart = prt; - return; } } } } } - private RecordReader getRecordReader() - throws Exception { + + private RecordReader, Writable> getRecordReader() + throws Exception { if (currPath == null) { getNextPath(); if (currPath == null) @@ -205,13 +213,19 @@ // not using FileInputFormat.setInputPaths() here because it forces a connection // to the default file system - which may or may not be online during pure metadata // operations - job.set("mapred.input.dir", - org.apache.hadoop.util.StringUtils.escapeString(currPath.toString())); + job.set("mapred.input.dir", escapeString(currPath.toString())); tableDesc tmp = currTbl; - if (tmp == null) + if (tmp == null) { tmp = currPart.getTableDesc(); + } + inputFormat = getInputFormatFromCache(tmp.getInputFileFormatClass(), job); + + if (work.getFilenameFilter() != null) { + FileInputFormat.setInputPathFilter(job, FetchFilenameFilter.class); + } + inputSplits = inputFormat.getSplits(job, 1); splitNum = 0; serde = tmp.getDeserializerClass().newInstance(); @@ -285,14 +299,13 @@ currRecReader = null; } } catch (Exception e) { - throw new HiveException("Failed with exception " + e.getMessage() - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new HiveException("Failed with exception " + + e.getMessage() + stringifyException(e)); } } public ObjectInspector getOutputObjectInspector() throws HiveException { try { - ObjectInspector outInspector; if (work.getTblDir() != null) { tableDesc tbl = work.getTblDesc(); Deserializer serde = tbl.getDeserializerClass().newInstance(); @@ -309,8 +322,9 @@ return rowObjectInspector; } } catch (Exception e) { - throw new HiveException("Failed with exception " + e.getMessage() - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + throw new HiveException("Failed with exception " + + e.getMessage() + stringifyException(e)); } } + } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -18,48 +18,70 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.*; -import java.text.SimpleDateFormat; -import java.util.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.net.URI; +import java.net.URLDecoder; import java.net.URLEncoder; -import java.net.URLDecoder; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; import org.apache.commons.logging.LogFactory; -import org.apache.commons.lang.StringUtils; - +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.*; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.history.HiveHistory; +import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.mapredWork; +import org.apache.hadoop.hive.ql.plan.partitionDesc; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.ql.plan.mapredWork; -import org.apache.hadoop.hive.ql.plan.partitionDesc; -import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; -import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; -import org.apache.hadoop.hive.ql.io.*; -import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.varia.NullAppender; -import java.lang.ClassNotFoundException; -import org.apache.hadoop.hive.common.FileUtils; +import static org.apache.commons.lang.StringUtils.join; +import static org.apache.commons.lang.StringUtils.split; +import static org.apache.commons.lang.StringUtils.isEmpty; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.hadoop.util.StringUtils.stringifyException; + + public class ExecDriver extends Task implements Serializable { private static final long serialVersionUID = 1L; @@ -82,7 +104,7 @@ SessionState ss = SessionState.get(); Set files = (ss == null) ? null : ss.list_resource(t, null); if (files != null) { - ArrayList realFiles = new ArrayList(files.size()); + List realFiles = new ArrayList(files.size()); for (String one : files) { try { realFiles.add(Utilities.realFile(one, conf)); @@ -91,7 +113,7 @@ + "due to exception: " + e.getMessage(), e); } } - return StringUtils.join(realFiles, ","); + return join(realFiles, ","); } else { return ""; } @@ -118,15 +140,15 @@ // "tmpfiles" and "tmpjars" are set by the method ExecDriver.execute(), // which will be called by both local and NON-local mode. String addedFiles = getResourceFiles(job, SessionState.ResourceType.FILE); - if (StringUtils.isNotBlank(addedFiles)) { + if (isNotBlank(addedFiles)) { HiveConf.setVar(job, ConfVars.HIVEADDEDFILES, addedFiles); } String addedJars = getResourceFiles(job, SessionState.ResourceType.JAR); - if (StringUtils.isNotBlank(addedJars)) { + if (isNotBlank(addedJars)) { HiveConf.setVar(job, ConfVars.HIVEADDEDJARS, addedJars); } String addedArchives = getResourceFiles(job, SessionState.ResourceType.ARCHIVE); - if (StringUtils.isNotBlank(addedArchives)) { + if (isNotBlank(addedArchives)) { HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives); } } @@ -135,7 +157,7 @@ * Constructor/Initialization for invocation as independent utility */ public ExecDriver(mapredWork plan, JobConf job, boolean isSilent) - throws HiveException { + throws HiveException { setWork(plan); this.job = job; LOG = LogFactory.getLog(this.getClass().getName()); @@ -268,8 +290,7 @@ JobClient jc = th.getJobClient(); RunningJob rj = th.getRunningJob(); String lastReport = ""; - SimpleDateFormat dateFormat - = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); long reportTime = System.currentTimeMillis(); long maxReportInterval = 60 * 1000; // One minute boolean fatal = false; @@ -305,14 +326,11 @@ String output = dateFormat.format(Calendar.getInstance().getTime()) + report; SessionState ss = SessionState.get(); if (ss != null) { - ss.getHiveHistory().setTaskCounters( - SessionState.get().getQueryId(), getId(), rj); - ss.getHiveHistory().setTaskProperty( - SessionState.get().getQueryId(), getId(), - Keys.TASK_HADOOP_PROGRESS, output); - ss.getHiveHistory().progressTask( - SessionState.get().getQueryId(), this); - ss.getHiveHistory().logPlanProgress(queryPlan); + HiveHistory history = ss.getHiveHistory(); + history.setTaskCounters(ss.getQueryId(), getId(), rj); + history.setTaskProperty(ss.getQueryId(), getId(), Keys.TASK_HADOOP_PROGRESS, output); + history.progressTask(ss.getQueryId(), this); + history.logPlanProgress(queryPlan); } console.printInfo(output); lastReport = report; @@ -399,7 +417,7 @@ public long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException { long r = 0; // For each input path, calculate the total size. - for (String path: work.getPathToAliases().keySet()) { + for (String path : work.getPathToAliases().keySet()) { try { Path p = new Path(path); FileSystem fs = p.getFileSystem(job); @@ -458,8 +476,7 @@ } catch(IOException e) { String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: " + e.getMessage(); - console.printError(statusMesg, "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + console.printError(statusMesg, "\n" + stringifyException(e)); return 1; } @@ -485,13 +502,16 @@ fs.mkdirs(emptyScratchDir); break; } catch (Exception e) { - if (numTries > 0) + if (numTries > 0) { numTries--; - else - throw new RuntimeException("Failed to make dir " + emptyScratchDir.toString() + " : " + e.getMessage()); + } else { + throw new RuntimeException("Failed to make dir " + + emptyScratchDir.toString() + " : " + e.getMessage()); + } } } + FileInputFormat.setInputPathFilter(job, MapredInputFilter.class); FileOutputFormat.setOutputPath(job, jobScratchDir); job.setMapperClass(ExecMapper.class); @@ -506,8 +526,9 @@ HiveConf.getVar(job, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS)); String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT); - if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) + if (inpFormat == null || !isNotBlank(inpFormat)) { inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName(); + } LOG.info("Using " + inpFormat); @@ -524,10 +545,10 @@ // Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands it String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS); String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS); - if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) { + if (isNotBlank(auxJars) || isNotBlank(addedJars)) { String allJars = - StringUtils.isNotBlank(auxJars) - ? (StringUtils.isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars) + isNotBlank(auxJars) + ? (isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars) : addedJars; LOG.info("adding libjars: " + allJars); initializeFiles("tmpjars", allJars); @@ -535,20 +556,19 @@ // Transfer HIVEADDEDFILES to "tmpfiles" so hadoop understands it String addedFiles = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDFILES); - if (StringUtils.isNotBlank(addedFiles)) { + if (isNotBlank(addedFiles)) { initializeFiles("tmpfiles", addedFiles); } // Transfer HIVEADDEDARCHIVES to "tmparchives" so hadoop understands it String addedArchives = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDARCHIVES); - if (StringUtils.isNotBlank(addedArchives)) { + if (isNotBlank(addedArchives)) { initializeFiles("tmparchives", addedArchives); } int returnVal = 0; RunningJob rj = null, orig_rj = null; - boolean noName = StringUtils.isEmpty(HiveConf. - getVar(job,HiveConf.ConfVars.HADOOPJOBNAME)); + boolean noName = isEmpty(HiveConf.getVar(job,HiveConf.ConfVars.HADOOPJOBNAME)); if(noName) { // This is for a special case to ensure unit tests pass @@ -557,7 +577,7 @@ try { addInputPaths(job, work, emptyScratchDirStr); - + Utilities.setMapRedWork(job, work); // remove the pwd from conf file so that job tracker doesn't show this logs @@ -610,8 +630,7 @@ } // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils - console.printError(mesg, "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + console.printError(mesg, "\n" + stringifyException(e)); success = false; returnVal = 1; @@ -647,8 +666,7 @@ success = false; returnVal = 3; String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'"; - console.printError(mesg, "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + console.printError(mesg, "\n" + stringifyException(e)); } } @@ -658,8 +676,8 @@ private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException { Map failures = new HashMap(); - Set successes = new HashSet (); - Map taskToJob = new HashMap(); + Set successes = new HashSet(); + Map taskToJob = new HashMap(); int startIndex = 0; @@ -801,7 +819,7 @@ URI pathURI = (new Path(planFileName)).toUri(); InputStream pathData; - if (StringUtils.isEmpty(pathURI.getScheme())) { + if (isEmpty(pathURI.getScheme())) { // default to local file system pathData = new FileInputStream(planFileName); } else { @@ -821,11 +839,11 @@ try { // see also - code in CliDriver.java ClassLoader loader = conf.getClassLoader(); - if (StringUtils.isNotBlank(auxJars)) { - loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ",")); + if (isNotBlank(auxJars)) { + loader = Utilities.addToClassPath(loader, split(auxJars, ",")); } - if (StringUtils.isNotBlank(addedJars)) { - loader = Utilities.addToClassPath(loader, StringUtils.split(addedJars, ",")); + if (isNotBlank(addedJars)) { + loader = Utilities.addToClassPath(loader, split(addedJars, ",")); } conf.setClassLoader(loader); // Also set this to the Thread ContextClassLoader, so new threads will inherit @@ -984,48 +1002,53 @@ return numEmptyPaths; } + private void addInputPaths(JobConf job, mapredWork work, String hiveScratchDir) throws Exception { int numEmptyPaths = 0; List pathsProcessed = new ArrayList(); // AliasToWork contains all the aliases - for (String oneAlias : work.getAliasToWork().keySet()) { - LOG.info("Processing alias " + oneAlias); - List emptyPaths = new ArrayList(); + for (String opAlias : work.getOperatorAliases()) { + LOG.info("Processing alias " + opAlias); + List emptyPaths = new ArrayList(); - // The alias may not have any path - String path = null; - for (String onefile : work.getPathToAliases().keySet()) { - List aliases = work.getPathToAliases().get(onefile); - if (aliases.contains(oneAlias)) { - path = onefile; + // There may not be any path associated with the operator + boolean hasPath = false; + for (String path : work.getPaths()) { + if (work.getAliasesForPath(path).contains(opAlias)) { + hasPath = true; // Multiple aliases can point to the same path - it should be processed only once - if (pathsProcessed.contains(path)) + if (pathsProcessed.contains(path)) { continue; + } + pathsProcessed.add(path); - LOG.info("Adding input file " + path); - - if (!isEmptyPath(job, path)) + LOG.info("Adding input directory " + path); + + if (!isEmptyPath(job, path)) { FileInputFormat.addInputPaths(job, path); - else + } else { emptyPaths.add(path); + } } } // Create a empty file if the directory is empty - for (String emptyPath : emptyPaths) - numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true, oneAlias); + for (String emptyPath : emptyPaths) { + numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true, opAlias); + } // If the query references non-existent partitions // We need to add a empty file, it is not acceptable to change the operator tree // Consider the query: // select * from (select count(1) from T union all select count(1) from T2) x; // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 rows) - if (path == null) - numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, oneAlias); + if (!hasPath) { + numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, opAlias); + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -1218,6 +1218,10 @@ tbl.setProperty("comment", crtTbl.getComment()); if (crtTbl.getLocation() != null) tblStorDesc.setLocation(crtTbl.getLocation()); + + if (crtTbl.isExternal()) { + tblStorDesc.setFilenameFilter(crtTbl.getFilenameFilter()); + } tbl.setInputFormatClass(crtTbl.getInputFormat()); tbl.setOutputFormatClass(crtTbl.getOutputFormat()); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/createTableDesc.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/createTableDesc.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/createTableDesc.java (working copy) @@ -46,6 +46,7 @@ String inputFormat; String outputFormat; String location; + String filenameFilter; String serName; Map mapProp; boolean ifNotExists; @@ -58,7 +59,8 @@ String collItemDelim, String mapKeyDelim, String lineDelim, String comment, String inputFormat, String outputFormat, - String location, String serName, Map mapProp, + String location, String filenameFilter, + String serName, Map mapProp, boolean ifNotExists) { this.tableName = tableName; this.isExternal = isExternal; @@ -73,6 +75,7 @@ this.outputFormat = outputFormat; this.lineDelim = lineDelim; this.location = location; + this.filenameFilter = filenameFilter; this.mapKeyDelim = mapKeyDelim; this.numBuckets = numBuckets; this.partCols = partCols; @@ -224,6 +227,15 @@ this.location = location; } + @explain(displayName="filenameFilter") + public String getFilenameFilter() { + return filenameFilter; + } + + public void setFilenameFilter(String filenameFilter) { + this.filenameFilter = filenameFilter; + } + @explain(displayName="isExternal") public boolean isExternal() { return isExternal; Index: ql/src/java/org/apache/hadoop/hive/ql/plan/mapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/mapredWork.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/mapredWork.java (working copy) @@ -18,12 +18,21 @@ package org.apache.hadoop.hive.ql.plan; -import java.util.*; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; +import static java.util.Collections.emptySet; + @explain(displayName="Map Reduce") public class mapredWork implements Serializable { private static final long serialVersionUID = 1L; @@ -31,11 +40,13 @@ // map side work // use LinkedHashMap to make sure the iteration order is // deterministic, to ease testing - private LinkedHashMap> pathToAliases; + private LinkedHashMap> pathToAliases; - private LinkedHashMap pathToPartitionInfo; + private LinkedHashMap aliasToFilenameFilter; + + private LinkedHashMap pathToPartitionInfo; - private LinkedHashMap> aliasToWork; + private LinkedHashMap> aliasToWork; private LinkedHashMap aliasToPartnInfo; @@ -55,6 +66,7 @@ public mapredWork() { this.aliasToPartnInfo = new LinkedHashMap(); + this.aliasToFilenameFilter = new LinkedHashMap(); } public mapredWork( @@ -77,6 +89,7 @@ this.numReduceTasks = numReduceTasks; this.mapLocalWork = mapLocalWork; this.aliasToPartnInfo = new LinkedHashMap(); + this.aliasToFilenameFilter = new LinkedHashMap(); } public String getCommand() { @@ -93,7 +106,50 @@ public void setPathToAliases(final LinkedHashMap> pathToAliases) { this.pathToAliases = pathToAliases; } + + public Set getPaths() { + return pathToAliases.keySet(); + } + + public void setAliasToFilenameFilter(LinkedHashMap aliasToFilenameFilter) { + this.aliasToFilenameFilter = aliasToFilenameFilter; + } + + public LinkedHashMap getAliasToFilenameFilter() { + return aliasToFilenameFilter; + } + + public Set getAliasesForPath(String path) { + List aliases = pathToAliases.get(path); + if (null != aliases) { + return new HashSet(aliases); + } else { + return emptySet(); + } + } + + public Set getAliasesForPath(Path path) { + return getAliasesForPath(path.toString()); + } + + public boolean containsPath(Path path) { + return containsPath(path.toUri().getPath()); + } + + public boolean containsPath(String path) { + return getPaths().contains(path); + } + + public String getFileNameFilterForAlias(String alias) { + return aliasToFilenameFilter.get(alias); + } + + + public void setFilenameFilterForAlias(String alias, String filter) { + aliasToFilenameFilter.put(alias, filter); + } + @explain(displayName="Path -> Partition", normalExplain=false) public LinkedHashMap getPathToPartitionInfo() { return this.pathToPartitionInfo; @@ -127,6 +183,18 @@ } + public Set getAliases() { + return aliasToWork.keySet(); + } + + public Set getOperatorAliases() { + return aliasToWork.keySet(); + } + + public Operator getOperatorForAlias(String alias) { + return aliasToWork.get(alias); + } + /** * @return the mapredLocalWork */ Index: ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java (working copy) @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.plan.tableDesc; @@ -29,13 +30,17 @@ public class fetchWork implements Serializable { private static final long serialVersionUID = 1L; + private static final int NO_LIMIT = -1; + private String tblDir; private tableDesc tblDesc; + + private String filenameFilter; private List partDir; private List partDesc; - private int limit; + private int limit = NO_LIMIT; /** * Serialization Null Format for the serde used to fetch data @@ -45,7 +50,7 @@ public fetchWork() { } public fetchWork(String tblDir, tableDesc tblDesc) { - this(tblDir, tblDesc, -1); + this(tblDir, tblDesc, NO_LIMIT); } public fetchWork(String tblDir, tableDesc tblDesc, int limit) { @@ -55,7 +60,7 @@ } public fetchWork(List partDir, List partDesc) { - this(partDir, partDesc, -1); + this(partDir, partDesc, NO_LIMIT); } public fetchWork(List partDir, List partDesc, int limit) { @@ -79,21 +84,30 @@ return tblDir; } - /** + /** + * @param tblDir the tblDir to set + */ + public void setTblDir(String tblDir) { + this.tblDir = tblDir; + } + + /** * @return the tblDir */ public Path getTblDirPath() { return new Path(tblDir); } - /** - * @param tblDir the tblDir to set - */ - public void setTblDir(String tblDir) { - this.tblDir = tblDir; - } - - /** + + public void setFilenameFilter(String filenameFilter) { + this.filenameFilter = filenameFilter; + } + + public String getFilenameFilter() { + return filenameFilter; + } + + /** * @return the tblDesc */ public tableDesc getTblDesc() { @@ -111,10 +125,16 @@ * @return the partDir */ public List getPartDir() { - return partDir; + return new ArrayList(partDir); } - + /** + * @param partDir the partDir to set + */ + public void setPartDir(List partDir) { + this.partDir = new ArrayList(partDir); + } + public List getPartDirPath() { return fetchWork.convertStringToPathArray(partDir); } @@ -141,12 +161,6 @@ return pathsStr; } - /** - * @param partDir the partDir to set - */ - public void setPartDir(List partDir) { - this.partDir = partDir; - } /** * @return the partDesc Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (working copy) @@ -121,7 +121,9 @@ LATERAL_VIEW_WITH_JOIN("Join with a lateral view is not supported"), LATERAL_VIEW_INVALID_CHILD("Lateral view AST with invalid child"), OUTPUT_SPECIFIED_MULTIPLE_TIMES("The same output cannot be present multiple times: "), - INVALID_AS("AS clause has an invalid number of aliases"); + INVALID_AS("AS clause has an invalid number of aliases"), + INVALID_FILENAME_FILTER_CLAUSE("Filename filters are only applicable to EXTERNAL tables"); + private String mesg; private String SQLState; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy) @@ -556,7 +556,7 @@ @init { msgs.push("table location specification"); } @after { msgs.pop(); } : - KW_LOCATION locn=StringLiteral -> ^(TOK_TABLELOCATION $locn) + KW_LOCATION locn=StringLiteral (KW_FILENAMEFILTER filt=StringLiteral)? -> ^(TOK_TABLELOCATION $locn $filt?) ; columnNameTypeList @@ -1518,6 +1518,7 @@ KW_RECORDWRITER: 'RECORDWRITER'; KW_SEMI: 'SEMI'; KW_LATERAL: 'LATERAL'; +KW_FILENAMEFILTER: 'FILENAME_FILTER'; // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 899787) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -4865,7 +4865,11 @@ Table tab = ((Map.Entry)iter.next()).getValue(); if (!tab.isPartitioned()) { if (qbParseInfo.getDestToWhereExpr().isEmpty()) { - fetch = new fetchWork(tab.getPath().toString(), Utilities.getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit()); + fetch = new fetchWork(); + fetch.setTblDir(tab.getPath().toString()); + fetch.setTblDesc(Utilities.getTableDesc(tab)); + fetch.setFilenameFilter(tab.getFilenameFilter()); + fetch.setLimit(qb.getParseInfo().getOuterQueryLimit()); noMapRed = true; inputs.add(new ReadEntity(tab)); } @@ -4905,7 +4909,10 @@ inputs.add(new ReadEntity(part)); } - fetch = new fetchWork(listP, partP, qb.getParseInfo().getOuterQueryLimit()); + fetch = new fetchWork(); + fetch.setPartDir(listP); + fetch.setPartDesc(partP); + fetch.setLimit(qb.getParseInfo().getOuterQueryLimit()); noMapRed = true; } } @@ -4926,17 +4933,22 @@ if (qb.getIsQuery()) { if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1)) throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg()); - String cols = loadFileWork.get(0).getColumns(); - String colTypes = loadFileWork.get(0).getColumnTypes(); + loadFileDesc loadDesc = loadFileWork.get(0); + + String cols = loadDesc.getColumns(); + String colTypes = loadDesc.getColumnTypes(); - fetch = new fetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(), - new tableDesc(LazySimpleSerDe.class, TextInputFormat.class, - IgnoreKeyTextOutputFormat.class, - Utilities.makeProperties( - org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode, - org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols, - org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)), - qb.getParseInfo().getOuterQueryLimit()); + fetch = new fetchWork(); + fetch.setTblDir(new Path(loadDesc.getSourceDir()).toString()); + fetch.setTblDesc(new tableDesc(LazySimpleSerDe.class, + TextInputFormat.class, IgnoreKeyTextOutputFormat.class, Utilities + .makeProperties( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, + "" + Utilities.ctrlaCode, + org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols, + org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, + colTypes))); + fetch.setLimit(qb.getParseInfo().getOuterQueryLimit()); fetchTask = TaskFactory.get(fetch, this.conf); setFetchTask(fetchTask); @@ -5377,6 +5389,7 @@ String inputFormat = TEXTFILE_INPUT; String outputFormat = TEXTFILE_OUTPUT; String location = null; + String filter = null; String serde = null; Map mapProp = null; boolean ifNotExists = false; @@ -5527,6 +5540,9 @@ break; case HiveParser.TOK_TABLELOCATION: location = unescapeSQLString(child.getChild(0).getText()); + if (child.getChildCount() == 2) { + filter = unescapeSQLString(child.getChild(1).getText()); + } break; default: assert false; } @@ -5554,7 +5570,7 @@ sortCols, numBuckets, fieldDelim, fieldEscape, collItemDelim, mapKeyDelim, lineDelim, - comment, inputFormat, outputFormat, location, serde, + comment, inputFormat, outputFormat, location, filter, serde, mapProp, ifNotExists); validateCreateTable(crtTblDesc); @@ -5586,7 +5602,7 @@ sortCols, numBuckets, fieldDelim, fieldEscape, collItemDelim, mapKeyDelim, lineDelim, - comment, inputFormat, outputFormat, location, serde, + comment, inputFormat, outputFormat, location, filter, serde, mapProp, ifNotExists); qb.setTableDesc(crtTblDesc); @@ -5597,10 +5613,14 @@ } private void validateCreateTable(createTableDesc crtTblDesc) throws SemanticException { + + if (!crtTblDesc.isExternal() && null != crtTblDesc.getFilenameFilter()) { + throw new SemanticException(ErrorMsg.INVALID_FILENAME_FILTER_CLAUSE.getMsg()); + } + // no duplicate column names // currently, it is a simple n*n algorithm - this can be optimized later if need be // but it should not be a major bottleneck as the number of columns are anyway not so big - if((crtTblDesc.getCols() == null) || (crtTblDesc.getCols().size() == 0)) { // for now make sure that serde exists if(StringUtils.isEmpty(crtTblDesc.getSerName()) || SerDeUtils.isNativeSerDe(crtTblDesc.getSerName())) {