Index: build-common.xml
===================================================================
--- build-common.xml (revision 899787)
+++ build-common.xml (working copy)
@@ -295,21 +295,25 @@
-->
-
-
-
+
+
+
+
+
+
+
+
-
+
+
-
-
-
+
Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 899787)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -72,6 +72,7 @@
POSTEXECHOOKS("hive.exec.post.hooks", ""),
EXECPARALLEL("hive.exec.parallel",false), // parallel query launching
HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution",true),
+ HIVEFETCHFILENAMEFILTER("hive.exec.fetch.filename.filter", ""),
// hadoop stuff
HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"),
Index: metastore/src/model/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java
===================================================================
--- metastore/src/model/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java (revision 899787)
+++ metastore/src/model/org/apache/hadoop/hive/metastore/model/MStorageDescriptor.java (working copy)
@@ -24,6 +24,7 @@
public class MStorageDescriptor {
private List cols;
private String location;
+ private String filenameFilter;
private String inputFormat;
private String outputFormat;
private boolean isCompressed = false;
@@ -39,6 +40,7 @@
/**
* @param cols
* @param location
+ * @param filenameFilter
* @param inputFormat
* @param outputFormat
* @param isCompressed
@@ -48,11 +50,12 @@
* @param sortOrder
* @param parameters
*/
- public MStorageDescriptor(List cols, String location, String inputFormat,
+ public MStorageDescriptor(List cols, String location, String filenameFilter, String inputFormat,
String outputFormat, boolean isCompressed, int numBuckets, MSerDeInfo serDeInfo,
List bucketCols, List sortOrder, Map parameters) {
this.cols = cols;
this.location = location;
+ this.filenameFilter = filenameFilter;
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.isCompressed = isCompressed;
@@ -77,8 +80,22 @@
public void setLocation(String location) {
this.location = location;
}
+
+ /**
+ * @return the filename filter
+ */
+ public String getFilenameFilter() {
+ return filenameFilter;
+ }
/**
+ * @param filenameFilter the filename filter to set
+ */
+ public void setFilenameFilter(String filenameFilter) {
+ this.filenameFilter = filenameFilter;
+ }
+
+ /**
* @return the isCompressed
*/
public boolean isCompressed() {
Index: metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (revision 899787)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (working copy)
@@ -657,6 +657,7 @@
return new StorageDescriptor(
convertToFieldSchemas(msd.getCols()),
msd.getLocation(),
+ msd.getFilenameFilter(),
msd.getInputFormat(),
msd.getOutputFormat(),
msd.isCompressed(),
@@ -672,6 +673,7 @@
return new MStorageDescriptor(
convertToMFieldSchemas(sd.getCols()),
sd.getLocation(),
+ sd.getFilenameFilter(),
sd.getInputFormat(),
sd.getOutputFormat(),
sd.isCompressed(),
Index: metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
===================================================================
--- metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (revision 899787)
+++ metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (working copy)
@@ -141,7 +141,7 @@
* @return the Deserializer
* @exception MetaException if any problems instantiating the Deserializer
*
- * todo - this should move somewhere into serde.jar
+ * TODO - this should move somewhere into serde.jar
*
*/
static public Deserializer getDeserializer(Configuration conf, Properties schema) throws MetaException {
@@ -167,7 +167,7 @@
* @return the Deserializer
* @exception MetaException if any problems instantiating the Deserializer
*
- * todo - this should move somewhere into serde.jar
+ * TODO - this should move somewhere into serde.jar
*
*/
static public Deserializer getDeserializer(Configuration conf, org.apache.hadoop.hive.metastore.api.Table table) throws MetaException {
@@ -297,6 +297,7 @@
t.setSd(new StorageDescriptor());
t.setTableName(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME));
t.getSd().setLocation(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_LOCATION));
+ t.getSd().setFilenameFilter(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_FILENAME_FILTER));
t.getSd().setInputFormat(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.FILE_INPUT_FORMAT,
org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName()));
t.getSd().setOutputFormat(schema.getProperty(org.apache.hadoop.hive.metastore.api.Constants.FILE_OUTPUT_FORMAT,
@@ -513,6 +514,9 @@
if(sd.getLocation() != null) {
schema.setProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_LOCATION, sd.getLocation());
}
+ if (sd.getFilenameFilter() != null) {
+ schema.setProperty(Constants.META_TABLE_FILENAME_FILTER, sd.getFilenameFilter());
+ }
schema.setProperty(org.apache.hadoop.hive.metastore.api.Constants.BUCKET_COUNT, Integer.toString(sd.getNumBuckets()));
if (sd.getBucketCols().size() > 0) {
schema.setProperty(org.apache.hadoop.hive.metastore.api.Constants.BUCKET_FIELD_NAME, sd.getBucketCols().get(0));
Index: metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java
===================================================================
--- metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java (revision 899787)
+++ metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java (working copy)
@@ -22,35 +22,38 @@
private static final TStruct STRUCT_DESC = new TStruct("StorageDescriptor");
private static final TField COLS_FIELD_DESC = new TField("cols", TType.LIST, (short)1);
private static final TField LOCATION_FIELD_DESC = new TField("location", TType.STRING, (short)2);
- private static final TField INPUT_FORMAT_FIELD_DESC = new TField("inputFormat", TType.STRING, (short)3);
- private static final TField OUTPUT_FORMAT_FIELD_DESC = new TField("outputFormat", TType.STRING, (short)4);
- private static final TField COMPRESSED_FIELD_DESC = new TField("compressed", TType.BOOL, (short)5);
- private static final TField NUM_BUCKETS_FIELD_DESC = new TField("numBuckets", TType.I32, (short)6);
- private static final TField SERDE_INFO_FIELD_DESC = new TField("serdeInfo", TType.STRUCT, (short)7);
- private static final TField BUCKET_COLS_FIELD_DESC = new TField("bucketCols", TType.LIST, (short)8);
- private static final TField SORT_COLS_FIELD_DESC = new TField("sortCols", TType.LIST, (short)9);
- private static final TField PARAMETERS_FIELD_DESC = new TField("parameters", TType.MAP, (short)10);
+ private static final TField FILENAME_FILTER_FIELD_DESC = new TField("filenameFilter", TType.STRING, (short)3);
+ private static final TField INPUT_FORMAT_FIELD_DESC = new TField("inputFormat", TType.STRING, (short)4);
+ private static final TField OUTPUT_FORMAT_FIELD_DESC = new TField("outputFormat", TType.STRING, (short)5);
+ private static final TField COMPRESSED_FIELD_DESC = new TField("compressed", TType.BOOL, (short)6);
+ private static final TField NUM_BUCKETS_FIELD_DESC = new TField("numBuckets", TType.I32, (short)7);
+ private static final TField SERDE_INFO_FIELD_DESC = new TField("serdeInfo", TType.STRUCT, (short)8);
+ private static final TField BUCKET_COLS_FIELD_DESC = new TField("bucketCols", TType.LIST, (short)9);
+ private static final TField SORT_COLS_FIELD_DESC = new TField("sortCols", TType.LIST, (short)10);
+ private static final TField PARAMETERS_FIELD_DESC = new TField("parameters", TType.MAP, (short)11);
private List cols;
public static final int COLS = 1;
private String location;
public static final int LOCATION = 2;
+ private String filenameFilter;
+ public static final int FILENAMEFILTER = 3;
private String inputFormat;
- public static final int INPUTFORMAT = 3;
+ public static final int INPUTFORMAT = 4;
private String outputFormat;
- public static final int OUTPUTFORMAT = 4;
+ public static final int OUTPUTFORMAT = 5;
private boolean compressed;
- public static final int COMPRESSED = 5;
+ public static final int COMPRESSED = 6;
private int numBuckets;
- public static final int NUMBUCKETS = 6;
+ public static final int NUMBUCKETS = 7;
private SerDeInfo serdeInfo;
- public static final int SERDEINFO = 7;
+ public static final int SERDEINFO = 8;
private List bucketCols;
- public static final int BUCKETCOLS = 8;
+ public static final int BUCKETCOLS = 9;
private List sortCols;
- public static final int SORTCOLS = 9;
+ public static final int SORTCOLS = 10;
private Map parameters;
- public static final int PARAMETERS = 10;
+ public static final int PARAMETERS = 11;
private final Isset __isset = new Isset();
private static final class Isset implements java.io.Serializable {
@@ -64,6 +67,8 @@
new StructMetaData(TType.STRUCT, FieldSchema.class))));
put(LOCATION, new FieldMetaData("location", TFieldRequirementType.DEFAULT,
new FieldValueMetaData(TType.STRING)));
+ put(FILENAMEFILTER, new FieldMetaData("filenameFilter", TFieldRequirementType.DEFAULT,
+ new FieldValueMetaData(TType.STRING)));
put(INPUTFORMAT, new FieldMetaData("inputFormat", TFieldRequirementType.DEFAULT,
new FieldValueMetaData(TType.STRING)));
put(OUTPUTFORMAT, new FieldMetaData("outputFormat", TFieldRequirementType.DEFAULT,
@@ -96,6 +101,7 @@
public StorageDescriptor(
List cols,
String location,
+ String filenameFilter,
String inputFormat,
String outputFormat,
boolean compressed,
@@ -108,6 +114,7 @@
this();
this.cols = cols;
this.location = location;
+ this.filenameFilter = filenameFilter;
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.compressed = compressed;
@@ -134,6 +141,9 @@
if (other.isSetLocation()) {
this.location = other.location;
}
+ if (other.isSetFilenameFilter()) {
+ this.filenameFilter = other.filenameFilter;
+ }
if (other.isSetInputFormat()) {
this.inputFormat = other.inputFormat;
}
@@ -232,6 +242,23 @@
return this.location != null;
}
+ public String getFilenameFilter() {
+ return this.filenameFilter;
+ }
+
+ public void setFilenameFilter(String filenameFilter) {
+ this.filenameFilter = filenameFilter;
+ }
+
+ public void unsetFilenameFilter() {
+ this.filenameFilter = null;
+ }
+
+ // Returns true if field filenameFilter is set (has been asigned a value) and false otherwise
+ public boolean isSetFilenameFilter() {
+ return this.filenameFilter != null;
+ }
+
public String getInputFormat() {
return this.inputFormat;
}
@@ -429,6 +456,14 @@
}
break;
+ case FILENAMEFILTER:
+ if (value == null) {
+ unsetFilenameFilter();
+ } else {
+ setFilenameFilter((String)value);
+ }
+ break;
+
case INPUTFORMAT:
if (value == null) {
unsetInputFormat();
@@ -506,6 +541,9 @@
case LOCATION:
return getLocation();
+ case FILENAMEFILTER:
+ return getFilenameFilter();
+
case INPUTFORMAT:
return getInputFormat();
@@ -542,6 +580,8 @@
return isSetCols();
case LOCATION:
return isSetLocation();
+ case FILENAMEFILTER:
+ return isSetFilenameFilter();
case INPUTFORMAT:
return isSetInputFormat();
case OUTPUTFORMAT:
@@ -594,6 +634,15 @@
return false;
}
+ boolean this_present_filenameFilter = true && this.isSetFilenameFilter();
+ boolean that_present_filenameFilter = true && that.isSetFilenameFilter();
+ if (this_present_filenameFilter || that_present_filenameFilter) {
+ if (!(this_present_filenameFilter && that_present_filenameFilter))
+ return false;
+ if (!this.filenameFilter.equals(that.filenameFilter))
+ return false;
+ }
+
boolean this_present_inputFormat = true && this.isSetInputFormat();
boolean that_present_inputFormat = true && that.isSetInputFormat();
if (this_present_inputFormat || that_present_inputFormat) {
@@ -710,6 +759,13 @@
TProtocolUtil.skip(iprot, field.type);
}
break;
+ case FILENAMEFILTER:
+ if (field.type == TType.STRING) {
+ this.filenameFilter = iprot.readString();
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
case INPUTFORMAT:
if (field.type == TType.STRING) {
this.inputFormat = iprot.readString();
@@ -833,6 +889,11 @@
oprot.writeString(this.location);
oprot.writeFieldEnd();
}
+ if (this.filenameFilter != null) {
+ oprot.writeFieldBegin(FILENAME_FILTER_FIELD_DESC);
+ oprot.writeString(this.filenameFilter);
+ oprot.writeFieldEnd();
+ }
if (this.inputFormat != null) {
oprot.writeFieldBegin(INPUT_FORMAT_FIELD_DESC);
oprot.writeString(this.inputFormat);
@@ -913,6 +974,14 @@
}
first = false;
if (!first) sb.append(", ");
+ sb.append("filenameFilter:");
+ if (this.filenameFilter == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.filenameFilter);
+ }
+ first = false;
+ if (!first) sb.append(", ");
sb.append("inputFormat:");
if (this.inputFormat == null) {
sb.append("null");
Index: metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Constants.java
===================================================================
--- metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Constants.java (revision 899787)
+++ metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Constants.java (working copy)
@@ -34,6 +34,8 @@
public static final String META_TABLE_LOCATION = "location";
+ public static final String META_TABLE_FILENAME_FILTER = "filename_filter";
+
public static final String META_TABLE_SERDE = "serde";
public static final String META_TABLE_PARTITION_COLUMNS = "partition_columns";
Index: metastore/if/hive_metastore.thrift
===================================================================
--- metastore/if/hive_metastore.thrift (revision 899787)
+++ metastore/if/hive_metastore.thrift (working copy)
@@ -52,14 +52,15 @@
struct StorageDescriptor {
1: list cols, // required (refer to types defined above)
2: string location, // defaults to //tablename
- 3: string inputFormat, // SequenceFileInputFormat (binary) or TextInputFormat` or custom format
- 4: string outputFormat, // SequenceFileOutputFormat (binary) or IgnoreKeyTextOutputFormat or custom format
- 5: bool compressed, // compressed or not
- 6: i32 numBuckets, // this must be specified if there are any dimension columns
- 7: SerDeInfo serdeInfo, // serialization and deserialization information
- 8: list bucketCols, // reducer grouping columns and clustering columns and bucketing columns`
- 9: list sortCols, // sort order of the data in each bucket
- 10: map parameters // any user supplied key value hash
+ 3: string filenameFilter, // filename regular expression filter
+ 4: string inputFormat, // SequenceFileInputFormat (binary) or TextInputFormat` or custom format
+ 5: string outputFormat, // SequenceFileOutputFormat (binary) or IgnoreKeyTextOutputFormat or custom format
+ 6: bool compressed, // compressed or not
+ 7: i32 numBuckets, // this must be specified if there are any dimension columns
+ 8: SerDeInfo serdeInfo, // serialization and deserialization information
+ 9: list bucketCols, // reducer grouping columns and clustering columns and bucketing columns`
+ 10: list sortCols, // sort order of the data in each bucket
+ 11: map parameters // any user supplied key value hash
}
// table information
@@ -224,6 +225,7 @@
const string META_TABLE_NAME = "name",
const string META_TABLE_DB = "db",
const string META_TABLE_LOCATION = "location",
+const string META_TABLE_FILENAME_FILTER = "filename_filter",
const string META_TABLE_SERDE = "serde",
const string META_TABLE_PARTITION_COLUMNS = "partition_columns",
const string FILE_INPUT_FORMAT = "file.inputformat",
Index: data/files/ext_regex/a_regex.txt
===================================================================
--- data/files/ext_regex/a_regex.txt (revision 0)
+++ data/files/ext_regex/a_regex.txt (revision 0)
@@ -0,0 +1,10 @@
+1|a_regex.txt|Lowell
+2|a_regex.txt|Worcester
+3|a_regex.txt|Roxbury
+4|a_regex.txt|Dedham
+5|a_regex.txt|Mattappan
+6|a_regex.txt|Leominster
+7|a_regex.txt|Shirley
+8|a_regex.txt|Quincy
+9|a_regex.txt|Brookline
+10|a_regex.txt|Brockton
Index: data/files/ext_regex/b_regex.txt
===================================================================
--- data/files/ext_regex/b_regex.txt (revision 0)
+++ data/files/ext_regex/b_regex.txt (revision 0)
@@ -0,0 +1,10 @@
+1|b_regex.txt|Lowell
+2|b_regex.txt|Worcester
+3|b_regex.txt|Roxbury
+4|b_regex.txt|Dedham
+5|b_regex.txt|Mattappan
+6|b_regex.txt|Leominster
+7|b_regex.txt|Shirley
+8|b_regex.txt|Quincy
+9|b_regex.txt|Brookline
+10|b_regex.txt|Brockton
Index: data/files/ext_regex/c_regex.txt
===================================================================
--- data/files/ext_regex/c_regex.txt (revision 0)
+++ data/files/ext_regex/c_regex.txt (revision 0)
@@ -0,0 +1,10 @@
+1|c_regex.txt|Lowell
+2|c_regex.txt|Worcester
+3|c_regex.txt|Roxbury
+4|c_regex.txt|Dedham
+5|c_regex.txt|Mattappan
+6|c_regex.txt|Leominster
+7|c_regex.txt|Shirley
+8|c_regex.txt|Quincy
+9|c_regex.txt|Brookline
+10|c_regex.txt|Brockton
Index: ql/src/test/results/clientpositive/external_filename_filter.q.out
===================================================================
--- ql/src/test/results/clientpositive/external_filename_filter.q.out (revision 0)
+++ ql/src/test/results/clientpositive/external_filename_filter.q.out (revision 0)
@@ -0,0 +1,306 @@
+PREHOOK: query: DROP TABLE tab_abc
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE tab_abc
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE tab_abc (
+ id INT,
+ file STRING,
+ name STRING
+ )
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+STORED AS TEXTFILE
+LOCATION "file:////Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex"
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE EXTERNAL TABLE tab_abc (
+ id INT,
+ file STRING,
+ name STRING
+ )
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+STORED AS TEXTFILE
+LOCATION "file:////Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex"
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@tab_abc
+PREHOOK: query: DROP TABLE tab_a
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE tab_a
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE tab_a (
+ id INT,
+ file STRING,
+ name STRING
+ )
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+STORED AS TEXTFILE
+LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex"
+FILENAME_FILTER '^a.*'
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE EXTERNAL TABLE tab_a (
+ id INT,
+ file STRING,
+ name STRING
+ )
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+STORED AS TEXTFILE
+LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex"
+FILENAME_FILTER '^a.*'
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@tab_a
+PREHOOK: query: DROP TABLE tab_b
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE tab_b
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE tab_b (
+ id INT,
+ file STRING,
+ name STRING
+ )
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+STORED AS TEXTFILE
+LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex"
+FILENAME_FILTER '^b.*'
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE EXTERNAL TABLE tab_b (
+ id INT,
+ file STRING,
+ name STRING
+ )
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+STORED AS TEXTFILE
+LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex"
+FILENAME_FILTER '^b.*'
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@tab_b
+PREHOOK: query: DROP TABLE tab_c
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE tab_c
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE tab_c (
+ id INT,
+ file STRING,
+ name STRING
+ )
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+STORED AS TEXTFILE
+LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex"
+FILENAME_FILTER '^c.*'
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE EXTERNAL TABLE tab_c (
+ id INT,
+ file STRING,
+ name STRING
+ )
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+STORED AS TEXTFILE
+LOCATION "/Users/carl/Projects/hd3/hive-trunk/build/ql/test/data/files/ext_regex"
+FILENAME_FILTER '^c.*'
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@tab_c
+PREHOOK: query: SELECT id, file, name from tab_abc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab_abc
+PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/182053365/10000
+POSTHOOK: query: SELECT id, file, name from tab_abc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab_abc
+POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/182053365/10000
+1 a_regex.txt Lowell
+2 a_regex.txt Worcester
+3 a_regex.txt Roxbury
+4 a_regex.txt Dedham
+5 a_regex.txt Mattappan
+6 a_regex.txt Leominster
+7 a_regex.txt Shirley
+8 a_regex.txt Quincy
+9 a_regex.txt Brookline
+10 a_regex.txt Brockton
+1 b_regex.txt Lowell
+2 b_regex.txt Worcester
+3 b_regex.txt Roxbury
+4 b_regex.txt Dedham
+5 b_regex.txt Mattappan
+6 b_regex.txt Leominster
+7 b_regex.txt Shirley
+8 b_regex.txt Quincy
+9 b_regex.txt Brookline
+10 b_regex.txt Brockton
+1 c_regex.txt Lowell
+2 c_regex.txt Worcester
+3 c_regex.txt Roxbury
+4 c_regex.txt Dedham
+5 c_regex.txt Mattappan
+6 c_regex.txt Leominster
+7 c_regex.txt Shirley
+8 c_regex.txt Quincy
+9 c_regex.txt Brookline
+10 c_regex.txt Brockton
+PREHOOK: query: SELECT * FROM tab_abc
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab_abc
+PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/128367836/10000
+POSTHOOK: query: SELECT * FROM tab_abc
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab_abc
+POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/128367836/10000
+1 a_regex.txt Lowell
+2 a_regex.txt Worcester
+3 a_regex.txt Roxbury
+4 a_regex.txt Dedham
+5 a_regex.txt Mattappan
+6 a_regex.txt Leominster
+7 a_regex.txt Shirley
+8 a_regex.txt Quincy
+9 a_regex.txt Brookline
+10 a_regex.txt Brockton
+1 b_regex.txt Lowell
+2 b_regex.txt Worcester
+3 b_regex.txt Roxbury
+4 b_regex.txt Dedham
+5 b_regex.txt Mattappan
+6 b_regex.txt Leominster
+7 b_regex.txt Shirley
+8 b_regex.txt Quincy
+9 b_regex.txt Brookline
+10 b_regex.txt Brockton
+1 c_regex.txt Lowell
+2 c_regex.txt Worcester
+3 c_regex.txt Roxbury
+4 c_regex.txt Dedham
+5 c_regex.txt Mattappan
+6 c_regex.txt Leominster
+7 c_regex.txt Shirley
+8 c_regex.txt Quincy
+9 c_regex.txt Brookline
+10 c_regex.txt Brockton
+PREHOOK: query: SELECT id, file, name from tab_a
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab_a
+PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1897634501/10000
+POSTHOOK: query: SELECT id, file, name from tab_a
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab_a
+POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1897634501/10000
+1 a_regex.txt Lowell
+2 a_regex.txt Worcester
+3 a_regex.txt Roxbury
+4 a_regex.txt Dedham
+5 a_regex.txt Mattappan
+6 a_regex.txt Leominster
+7 a_regex.txt Shirley
+8 a_regex.txt Quincy
+9 a_regex.txt Brookline
+10 a_regex.txt Brockton
+PREHOOK: query: SELECT * from tab_a
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab_a
+PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1968978404/10000
+POSTHOOK: query: SELECT * from tab_a
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab_a
+POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1968978404/10000
+1 a_regex.txt Lowell
+2 a_regex.txt Worcester
+3 a_regex.txt Roxbury
+4 a_regex.txt Dedham
+5 a_regex.txt Mattappan
+6 a_regex.txt Leominster
+7 a_regex.txt Shirley
+8 a_regex.txt Quincy
+9 a_regex.txt Brookline
+10 a_regex.txt Brockton
+PREHOOK: query: SELECT id, file, name from tab_b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab_b
+PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1919070567/10000
+POSTHOOK: query: SELECT id, file, name from tab_b
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab_b
+POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1919070567/10000
+1 b_regex.txt Lowell
+2 b_regex.txt Worcester
+3 b_regex.txt Roxbury
+4 b_regex.txt Dedham
+5 b_regex.txt Mattappan
+6 b_regex.txt Leominster
+7 b_regex.txt Shirley
+8 b_regex.txt Quincy
+9 b_regex.txt Brookline
+10 b_regex.txt Brockton
+PREHOOK: query: SELECT * from tab_b
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab_b
+PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/348355769/10000
+POSTHOOK: query: SELECT * from tab_b
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab_b
+POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/348355769/10000
+1 b_regex.txt Lowell
+2 b_regex.txt Worcester
+3 b_regex.txt Roxbury
+4 b_regex.txt Dedham
+5 b_regex.txt Mattappan
+6 b_regex.txt Leominster
+7 b_regex.txt Shirley
+8 b_regex.txt Quincy
+9 b_regex.txt Brookline
+10 b_regex.txt Brockton
+PREHOOK: query: SELECT a.id, a.file, b.file, a.name
+FROM tab_a a JOIN tab_b b
+ON a.id = b.id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab_b
+PREHOOK: Input: default@tab_a
+PREHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1303397111/10000
+POSTHOOK: query: SELECT a.id, a.file, b.file, a.name
+FROM tab_a a JOIN tab_b b
+ON a.id = b.id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab_b
+POSTHOOK: Input: default@tab_a
+POSTHOOK: Output: file:/Users/carl/Projects/hd3/hive-trunk/build/ql/scratchdir/1303397111/10000
+1 a_regex.txt b_regex.txt Lowell
+2 a_regex.txt b_regex.txt Worcester
+3 a_regex.txt b_regex.txt Roxbury
+4 a_regex.txt b_regex.txt Dedham
+5 a_regex.txt b_regex.txt Mattappan
+6 a_regex.txt b_regex.txt Leominster
+7 a_regex.txt b_regex.txt Shirley
+8 a_regex.txt b_regex.txt Quincy
+9 a_regex.txt b_regex.txt Brookline
+10 a_regex.txt b_regex.txt Brockton
+PREHOOK: query: DROP TABLE tab_abc
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE tab_abc
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@tab_abc
+PREHOOK: query: DROP TABLE tab_a
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE tab_a
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@tab_a
+PREHOOK: query: DROP TABLE tab_b
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE tab_b
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@tab_b
+PREHOOK: query: DROP TABLE tab_c
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE tab_c
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@tab_c
Index: ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (revision 899787)
+++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java (working copy)
@@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -38,11 +39,9 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.cli.CliDriver;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -93,6 +92,42 @@
private HadoopShims.MiniDFSShim dfs = null;
private boolean miniMr = false;
+ private static final Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
+ private static final int MAX_SUBST = 20;
+
+ // adapted from o.a.h.conf.Configuration
+ public static String substituteVars(Properties props, String expr) {
+
+ if (expr == null) {
+ return null;
+ }
+ Matcher match = varPat.matcher("");
+ String eval = expr;
+ for(int s=0; s> inputFormatClass,
- Class> outputFormatClass,
- URI dataLocation, Hive hive) throws HiveException {
- initEmpty();
- this.schema = schema;
- this.deserializer = deserializer; //TODO: convert to SerDeInfo format
- this.getTTable().getSd().getSerdeInfo().setSerializationLib(deserializer.getClass().getName());
- getTTable().setTableName(name);
- getSerdeInfo().setSerializationLib(deserializer.getClass().getName());
- setInputFormatClass(inputFormatClass);
- setOutputFormatClass(HiveFileFormatUtils.getOutputFormatSubstitute(outputFormatClass));
- setDataLocation(dataLocation);
- }
-
public Table(String name) {
// fill in defaults
initEmpty();
@@ -217,6 +190,10 @@
final public Path getPath() {
return new Path(getTTable().getSd().getLocation());
}
+
+ final public String getFilenameFilter() {
+ return getTTable().getSd().getFilenameFilter();
+ }
final public String getName() {
return getTTable().getTableName();
@@ -377,6 +354,10 @@
uri = uri2;
getTTable().getSd().setLocation(uri2.toString());
}
+
+ public void setFilenameFilter(String filenameFilter) {
+ getTTable().getSd().setFilenameFilter(filenameFilter);
+ }
public void setBucketCols(List bucketCols) throws HiveException {
if (bucketCols == null) {
Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (working copy)
@@ -391,6 +391,7 @@
true, JavaUtils.getClassLoader()));
table.setDeserializer(MetaStoreUtils.getDeserializer(getConf(), p));
table.setDataLocation(new URI(tTable.getSd().getLocation()));
+ table.setFilenameFilter(tTable.getSd().getFilenameFilter());
} catch(Exception e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.Constants;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -396,6 +397,7 @@
Path tblDir = null;
tableDesc tblDesc = null;
+ String tblFilenameFilter = null;
PrunedPartitionList partsList = null;
@@ -460,6 +462,7 @@
tblDir = paths[0];
tblDesc = Utilities.getTableDesc(part.getTable());
+ tblFilenameFilter = tblDesc.getProperties().getProperty(Constants.META_TABLE_FILENAME_FILTER);
}
for (Path p: paths) {
@@ -494,6 +497,11 @@
}
plan.getPathToAliases().get(path).add(alias_id);
plan.getPathToPartitionInfo().put(path, prtDesc);
+
+ if (tblFilenameFilter != null) {
+ plan.setFilenameFilterForAlias(alias_id, tblFilenameFilter);
+ }
+
LOG.debug("Information added for path " + path);
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (working copy)
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Map.Entry;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -34,7 +35,6 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -45,13 +45,16 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.unmodifiableList;
+
/**
* Map operator. This triggers overall map side processing.
* This is a little different from regular operators in that
* it starts off by processing a Writable data structure from
* a Table (instead of a Hive Object).
**/
-public class MapOperator extends Operator implements Serializable {
+public class MapOperator extends Operator implements Serializable {
private static final long serialVersionUID = 1L;
public static enum Counter {DESERIALIZE_ERRORS}
@@ -61,18 +64,40 @@
transient private Object[] rowWithPart;
transient private StructObjectInspector rowObjectInspector;
transient private boolean isPartitioned;
- private Map opCtxMap;
+
+ private Map opCtxMap =
+ new HashMap();
- private Map, java.util.ArrayList> operatorToPaths;
+ private Map, List> operatorToPaths =
+ new HashMap, List>();
+
+ private List childrenPaths = new ArrayList();
- private java.util.ArrayList childrenPaths = new ArrayList();
+ private List> extraChildrenToClose =
+ new ArrayList>();
- private ArrayList> extraChildrenToClose = null;
-
+ private void addOperatorPath(Operator extends Serializable> operator, String path) {
+ List pathList = operatorToPaths.get(operator);
+ if (null == pathList) {
+ pathList = new ArrayList();
+ operatorToPaths.put(operator, pathList);
+ }
+ pathList.add(path);
+ }
+
+ private List getOperatorPaths(Operator extends Serializable> operator) {
+ List paths = operatorToPaths.get(operator);
+ if (null == paths) {
+ return emptyList();
+ } else {
+ return unmodifiableList(paths);
+ }
+ }
+
private static class MapInputPath {
- String path;
- String alias;
- Operator extends Serializable> op;
+ private String path;
+ private String alias;
+ private Operator extends Serializable> op;
/**
* @param path
@@ -171,7 +196,7 @@
LinkedHashMap partSpec = td.getPartSpec();
Properties tblProps = td.getProperties();
- Class sdclass = td.getDeserializerClass();
+ Class> sdclass = td.getDeserializerClass();
if(sdclass == null) {
String className = td.getSerdeClassName();
if ((className == "") || (className == null)) {
@@ -233,39 +258,42 @@
}
public void setChildren(Configuration hconf) throws HiveException {
-
- Path fpath = new Path((new Path(HiveConf.getVar(hconf,
- HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
- ArrayList> children =
+ String mapFilename = HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME);
+ Path mapFilePath = new Path((new Path(mapFilename)).toUri().getPath());
+ List> childOps =
new ArrayList>();
- opCtxMap = new HashMap();
- operatorToPaths = new HashMap, java.util.ArrayList> ();
-
+
statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
try {
boolean done = false;
- for (String onefile : conf.getPathToAliases().keySet()) {
- MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile);
- Path onepath = new Path(new Path(onefile).toUri().getPath());
- List aliases = conf.getPathToAliases().get(onefile);
- for (String onealias : aliases) {
- Operator extends Serializable> op = conf.getAliasToWork().get(
- onealias);
- LOG.info("Adding alias " + onealias + " to work list for file "
- + fpath.toUri().getPath());
- MapInputPath inp = new MapInputPath(onefile, onealias, op);
+ for (String dirPath : conf.getPaths()) {
+ MapOpCtx opCtx = initObjectInspector(conf, hconf, dirPath);
+
+ for (String tabAlias : conf.getAliasesForPath(dirPath)) {
+ // Skip file if it doesn't match the filter
+ String fileNameFilter = conf.getFileNameFilterForAlias(tabAlias);
+ if (null != fileNameFilter) {
+ if (!Pattern.compile(fileNameFilter).matcher(mapFilePath.getName()).matches()) {
+ continue;
+ }
+ }
+
+ Operator extends Serializable> op = conf.getOperatorForAlias(tabAlias);
+ LOG.info("Adding alias " + tabAlias + " to work list for file " + mapFilePath.toUri().getPath());
+
+ MapInputPath inp = new MapInputPath(dirPath, tabAlias, op);
opCtxMap.put(inp, opCtx);
- if(operatorToPaths.get(op) == null)
- operatorToPaths.put(op, new java.util.ArrayList());
- operatorToPaths.get(op).add(onefile);
+ addOperatorPath(op, dirPath);
op.setParentOperators(new ArrayList>());
op.getParentOperators().add(this);
+
// check for the operators who will process rows coming to this Map Operator
- if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
- children.add(op);
- childrenPaths.add(onefile);
+ Path filePath = new Path(new Path(dirPath).toUri().getPath());
+ if (!filePath.toUri().relativize(mapFilePath.toUri()).equals(mapFilePath.toUri())) {
+ childOps.add(op);
+ childrenPaths.add(dirPath);
LOG.info("dump " + op.getName() + " " + opCtxMap.get(inp).getRowObjectInspector().getTypeName());
if (!done) {
deserializer = opCtxMap.get(inp).getDeserializer();
@@ -277,16 +305,16 @@
}
}
}
- if (children.size() == 0) {
+ if (childOps.size() == 0) {
// didn't find match for input file path in configuration!
// serious problem ..
LOG.error("Configuration does not have any alias for path: "
- + fpath.toUri().getPath());
+ + mapFilePath.toUri().getPath());
throw new HiveException("Configuration and input path are inconsistent");
}
// we found all the operators that we are supposed to process.
- setChildOperators(children);
+ setChildOperators(childOps);
} catch (Exception e) {
throw new HiveException(e);
}
@@ -307,9 +335,6 @@
Operator extends Serializable> op = input.op;
// op is not in the children list, so need to remember it and close it afterwards
if ( children.indexOf(op) == -1 ) {
- if ( extraChildrenToClose == null ) {
- extraChildrenToClose = new ArrayList>();
- }
extraChildrenToClose.add(op);
}
@@ -317,8 +342,7 @@
// below logic is to avoid initialize one operator multiple times if there
// is one input path in this mapper's input paths.
boolean shouldInit = true;
- List paths = operatorToPaths.get(op);
- for(String path: paths) {
+ for(String path : getOperatorPaths(op)) {
if(childrenPaths.contains(path) && !path.equals(input.path)) {
shouldInit = false;
break;
@@ -333,10 +357,8 @@
* close extra child operators that are initialized but are not executed.
*/
public void closeOp(boolean abort) throws HiveException {
- if ( extraChildrenToClose != null ) {
- for (Operator extends Serializable> op : extraChildrenToClose) {
- op.close(abort);
- }
+ for (Operator extends Serializable> op : extraChildrenToClose) {
+ op.close(abort);
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredInputFilter.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredInputFilter.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredInputFilter.java (revision 0)
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+
+public class MapredInputFilter implements PathFilter, Configurable {
+ private Configuration conf;
+ private mapredWork mrWork;
+
+ @Override
+ public boolean accept(Path inFilePath) {
+ String filename = inFilePath.getName();
+ Path tblPath = inFilePath.getParent();
+
+ for (String alias : mrWork.getAliasesForPath(tblPath)) {
+ String filter = mrWork.getFileNameFilterForAlias(alias);
+ if (null == filter || Pattern.compile(filter).matcher(filename).matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ mrWork = Utilities.getMapRedWork(conf);
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFilenameFilter.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFilenameFilter.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFilenameFilter.java (revision 0)
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class FetchFilenameFilter extends FilenameFilter implements Configurable {
+ private Configuration conf;
+
+ public FetchFilenameFilter() {
+ // zero-arg constructor to keep reflection happy
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ String filter = conf.get(HiveConf.ConfVars.HIVEFETCHFILENAMEFILTER.varname);
+ if (filter != null) {
+ addFilter(filter);
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FilenameFilter.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FilenameFilter.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FilenameFilter.java (revision 0)
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+
+public class FilenameFilter implements PathFilter {
+
+ private List filters = new ArrayList();
+ protected final Log LOG = LogFactory.getLog(this.getClass().getName());
+
+ @Override
+ public boolean accept(Path filePath) {
+ String filename = filePath.getName();
+ // Path fileDir = filePath.getParent();
+
+ for (String filter : filters) {
+ if (Pattern.compile(filter).matcher(filename).matches()) {
+ LOG.info("Accepting " + filePath.toString());
+ return true;
+ }
+ }
+ LOG.info("Rejecting " + filePath.toString());
+ return false;
+ }
+
+ protected void addFilter(String filter) {
+ filters.add(filter);
+ }
+
+ protected List getFilters() {
+ return filters;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy)
@@ -23,14 +23,16 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Constants;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.fetchWork;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
@@ -41,7 +43,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -54,6 +55,9 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import static org.apache.hadoop.util.StringUtils.stringifyException;
+import static org.apache.hadoop.util.StringUtils.escapeString;
+
/**
* FetchTask implementation
**/
@@ -69,6 +73,10 @@
this.work = work;
this.job = job;
+ if (work.getFilenameFilter() != null) {
+ job.set(HiveConf.ConfVars.HIVEFETCHFILENAMEFILTER.varname, work.getFilenameFilter());
+ }
+
currRecReader = null;
currPath = null;
currTbl = null;
@@ -81,11 +89,11 @@
private fetchWork work;
private int splitNum;
- private RecordReader currRecReader;
+ private RecordReader, Writable> currRecReader;
+ private FileInputFormat, Writable> inputFormat;
private InputSplit[] inputSplits;
- private InputFormat inputFormat;
private JobConf job;
- private WritableComparable key;
+ private WritableComparable> key;
private Writable value;
private Deserializer serde;
private Iterator iterPath;
@@ -100,21 +108,23 @@
/**
* A cache of InputFormat instances.
*/
- private static Map> inputFormats = new HashMap>();
+ private static Map, FileInputFormat, Writable>> inputFormats =
+ new HashMap, FileInputFormat, Writable>>();
- static InputFormat getInputFormatFromCache(
- Class inputFormatClass, Configuration conf) throws IOException {
- if (!inputFormats.containsKey(inputFormatClass)) {
+ static FileInputFormat, Writable>
+ getInputFormatFromCache(Class> inputFormatClass, Configuration conf) throws IOException {
+ FileInputFormat, Writable> inputFormat = inputFormats.get(inputFormatClass);
+ if (inputFormat == null) {
try {
- InputFormat newInstance = (InputFormat) ReflectionUtils
- .newInstance(inputFormatClass, conf);
- inputFormats.put(inputFormatClass, newInstance);
+ inputFormat =
+ (FileInputFormat, Writable>) ReflectionUtils.newInstance(inputFormatClass, conf);
+ inputFormats.put(inputFormatClass, inputFormat);
} catch (Exception e) {
throw new IOException("Cannot create an instance of InputFormat class "
+ inputFormatClass.getName() + " as specified in mapredWork!");
}
}
- return inputFormats.get(inputFormatClass);
+ return inputFormat;
}
private void setPrtnDesc() throws Exception {
@@ -124,9 +134,8 @@
String pcols = currPart
.getTableDesc()
.getProperties()
- .getProperty(
- org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
- LinkedHashMap partSpec = currPart.getPartSpec();
+ .getProperty(Constants.META_TABLE_PARTITION_COLUMNS);
+ Map partSpec = currPart.getPartSpec();
List partObjectInspectors = new ArrayList();
String[] partKeys = pcols.trim().split("/");
@@ -147,6 +156,7 @@
}
private void getNextPath() throws Exception {
+
// first time
if (iterPath == null) {
if (work.getTblDir() != null) {
@@ -155,8 +165,7 @@
currTbl = work.getTblDesc();
FileSystem fs = currPath.getFileSystem(job);
if (fs.exists(currPath)) {
- FileStatus[] fStats = fs.listStatus(currPath);
- for (FileStatus fStat : fStats) {
+ for (FileStatus fStat : fs.listStatus(currPath)) {
if (fStat.getLen() > 0) {
tblDataDone = true;
break;
@@ -183,20 +192,19 @@
partitionDesc prt = iterPartDesc.next();
FileSystem fs = nxt.getFileSystem(job);
if (fs.exists(nxt)) {
- FileStatus[] fStats = fs.listStatus(nxt);
- for (FileStatus fStat : fStats) {
+ for (FileStatus fStat : fs.listStatus(nxt)) {
if (fStat.getLen() > 0) {
currPath = nxt;
currPart = prt;
- return;
}
}
}
}
}
- private RecordReader getRecordReader()
- throws Exception {
+
+ private RecordReader, Writable> getRecordReader()
+ throws Exception {
if (currPath == null) {
getNextPath();
if (currPath == null)
@@ -205,13 +213,19 @@
// not using FileInputFormat.setInputPaths() here because it forces a connection
// to the default file system - which may or may not be online during pure metadata
// operations
- job.set("mapred.input.dir",
- org.apache.hadoop.util.StringUtils.escapeString(currPath.toString()));
+ job.set("mapred.input.dir", escapeString(currPath.toString()));
tableDesc tmp = currTbl;
- if (tmp == null)
+ if (tmp == null) {
tmp = currPart.getTableDesc();
+ }
+
inputFormat = getInputFormatFromCache(tmp.getInputFileFormatClass(), job);
+
+ if (work.getFilenameFilter() != null) {
+ FileInputFormat.setInputPathFilter(job, FetchFilenameFilter.class);
+ }
+
inputSplits = inputFormat.getSplits(job, 1);
splitNum = 0;
serde = tmp.getDeserializerClass().newInstance();
@@ -285,14 +299,13 @@
currRecReader = null;
}
} catch (Exception e) {
- throw new HiveException("Failed with exception " + e.getMessage()
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new HiveException("Failed with exception "
+ + e.getMessage() + stringifyException(e));
}
}
public ObjectInspector getOutputObjectInspector() throws HiveException {
try {
- ObjectInspector outInspector;
if (work.getTblDir() != null) {
tableDesc tbl = work.getTblDesc();
Deserializer serde = tbl.getDeserializerClass().newInstance();
@@ -309,8 +322,9 @@
return rowObjectInspector;
}
} catch (Exception e) {
- throw new HiveException("Failed with exception " + e.getMessage()
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ throw new HiveException("Failed with exception "
+ + e.getMessage() + stringifyException(e));
}
}
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -18,48 +18,70 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.*;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
import java.net.URI;
+import java.net.URLDecoder;
import java.net.URLEncoder;
-import java.net.URLDecoder;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang.StringUtils;
-
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.history.HiveHistory;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.ql.plan.mapredWork;
-import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
-import org.apache.hadoop.hive.ql.io.*;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.varia.NullAppender;
-import java.lang.ClassNotFoundException;
-import org.apache.hadoop.hive.common.FileUtils;
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.commons.lang.StringUtils.split;
+import static org.apache.commons.lang.StringUtils.isEmpty;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.hadoop.util.StringUtils.stringifyException;
+
+
public class ExecDriver extends Task implements Serializable {
private static final long serialVersionUID = 1L;
@@ -82,7 +104,7 @@
SessionState ss = SessionState.get();
Set files = (ss == null) ? null : ss.list_resource(t, null);
if (files != null) {
- ArrayList realFiles = new ArrayList(files.size());
+ List realFiles = new ArrayList(files.size());
for (String one : files) {
try {
realFiles.add(Utilities.realFile(one, conf));
@@ -91,7 +113,7 @@
+ "due to exception: " + e.getMessage(), e);
}
}
- return StringUtils.join(realFiles, ",");
+ return join(realFiles, ",");
} else {
return "";
}
@@ -118,15 +140,15 @@
// "tmpfiles" and "tmpjars" are set by the method ExecDriver.execute(),
// which will be called by both local and NON-local mode.
String addedFiles = getResourceFiles(job, SessionState.ResourceType.FILE);
- if (StringUtils.isNotBlank(addedFiles)) {
+ if (isNotBlank(addedFiles)) {
HiveConf.setVar(job, ConfVars.HIVEADDEDFILES, addedFiles);
}
String addedJars = getResourceFiles(job, SessionState.ResourceType.JAR);
- if (StringUtils.isNotBlank(addedJars)) {
+ if (isNotBlank(addedJars)) {
HiveConf.setVar(job, ConfVars.HIVEADDEDJARS, addedJars);
}
String addedArchives = getResourceFiles(job, SessionState.ResourceType.ARCHIVE);
- if (StringUtils.isNotBlank(addedArchives)) {
+ if (isNotBlank(addedArchives)) {
HiveConf.setVar(job, ConfVars.HIVEADDEDARCHIVES, addedArchives);
}
}
@@ -135,7 +157,7 @@
* Constructor/Initialization for invocation as independent utility
*/
public ExecDriver(mapredWork plan, JobConf job, boolean isSilent)
- throws HiveException {
+ throws HiveException {
setWork(plan);
this.job = job;
LOG = LogFactory.getLog(this.getClass().getName());
@@ -268,8 +290,7 @@
JobClient jc = th.getJobClient();
RunningJob rj = th.getRunningJob();
String lastReport = "";
- SimpleDateFormat dateFormat
- = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
long reportTime = System.currentTimeMillis();
long maxReportInterval = 60 * 1000; // One minute
boolean fatal = false;
@@ -305,14 +326,11 @@
String output = dateFormat.format(Calendar.getInstance().getTime()) + report;
SessionState ss = SessionState.get();
if (ss != null) {
- ss.getHiveHistory().setTaskCounters(
- SessionState.get().getQueryId(), getId(), rj);
- ss.getHiveHistory().setTaskProperty(
- SessionState.get().getQueryId(), getId(),
- Keys.TASK_HADOOP_PROGRESS, output);
- ss.getHiveHistory().progressTask(
- SessionState.get().getQueryId(), this);
- ss.getHiveHistory().logPlanProgress(queryPlan);
+ HiveHistory history = ss.getHiveHistory();
+ history.setTaskCounters(ss.getQueryId(), getId(), rj);
+ history.setTaskProperty(ss.getQueryId(), getId(), Keys.TASK_HADOOP_PROGRESS, output);
+ history.progressTask(ss.getQueryId(), this);
+ history.logPlanProgress(queryPlan);
}
console.printInfo(output);
lastReport = report;
@@ -399,7 +417,7 @@
public long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {
long r = 0;
// For each input path, calculate the total size.
- for (String path: work.getPathToAliases().keySet()) {
+ for (String path : work.getPathToAliases().keySet()) {
try {
Path p = new Path(path);
FileSystem fs = p.getFileSystem(job);
@@ -458,8 +476,7 @@
} catch(IOException e) {
String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: "
+ e.getMessage();
- console.printError(statusMesg, "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ console.printError(statusMesg, "\n" + stringifyException(e));
return 1;
}
@@ -485,13 +502,16 @@
fs.mkdirs(emptyScratchDir);
break;
} catch (Exception e) {
- if (numTries > 0)
+ if (numTries > 0) {
numTries--;
- else
- throw new RuntimeException("Failed to make dir " + emptyScratchDir.toString() + " : " + e.getMessage());
+ } else {
+ throw new RuntimeException("Failed to make dir "
+ + emptyScratchDir.toString() + " : " + e.getMessage());
+ }
}
}
+ FileInputFormat.setInputPathFilter(job, MapredInputFilter.class);
FileOutputFormat.setOutputPath(job, jobScratchDir);
job.setMapperClass(ExecMapper.class);
@@ -506,8 +526,9 @@
HiveConf.getVar(job, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS));
String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
- if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat)))
+ if (inpFormat == null || !isNotBlank(inpFormat)) {
inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+ }
LOG.info("Using " + inpFormat);
@@ -524,10 +545,10 @@
// Transfer HIVEAUXJARS and HIVEADDEDJARS to "tmpjars" so hadoop understands it
String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS);
- if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) {
+ if (isNotBlank(auxJars) || isNotBlank(addedJars)) {
String allJars =
- StringUtils.isNotBlank(auxJars)
- ? (StringUtils.isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
+ isNotBlank(auxJars)
+ ? (isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
: addedJars;
LOG.info("adding libjars: " + allJars);
initializeFiles("tmpjars", allJars);
@@ -535,20 +556,19 @@
// Transfer HIVEADDEDFILES to "tmpfiles" so hadoop understands it
String addedFiles = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDFILES);
- if (StringUtils.isNotBlank(addedFiles)) {
+ if (isNotBlank(addedFiles)) {
initializeFiles("tmpfiles", addedFiles);
}
// Transfer HIVEADDEDARCHIVES to "tmparchives" so hadoop understands it
String addedArchives = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDARCHIVES);
- if (StringUtils.isNotBlank(addedArchives)) {
+ if (isNotBlank(addedArchives)) {
initializeFiles("tmparchives", addedArchives);
}
int returnVal = 0;
RunningJob rj = null, orig_rj = null;
- boolean noName = StringUtils.isEmpty(HiveConf.
- getVar(job,HiveConf.ConfVars.HADOOPJOBNAME));
+ boolean noName = isEmpty(HiveConf.getVar(job,HiveConf.ConfVars.HADOOPJOBNAME));
if(noName) {
// This is for a special case to ensure unit tests pass
@@ -557,7 +577,7 @@
try {
addInputPaths(job, work, emptyScratchDirStr);
-
+
Utilities.setMapRedWork(job, work);
// remove the pwd from conf file so that job tracker doesn't show this logs
@@ -610,8 +630,7 @@
}
// Has to use full name to make sure it does not conflict with
// org.apache.commons.lang.StringUtils
- console.printError(mesg, "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ console.printError(mesg, "\n" + stringifyException(e));
success = false;
returnVal = 1;
@@ -647,8 +666,7 @@
success = false;
returnVal = 3;
String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'";
- console.printError(mesg, "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ console.printError(mesg, "\n" + stringifyException(e));
}
}
@@ -658,8 +676,8 @@
private void showJobFailDebugInfo(JobConf conf, RunningJob rj) throws IOException {
Map failures = new HashMap();
- Set successes = new HashSet ();
- Map taskToJob = new HashMap();
+ Set successes = new HashSet();
+ Map taskToJob = new HashMap();
int startIndex = 0;
@@ -801,7 +819,7 @@
URI pathURI = (new Path(planFileName)).toUri();
InputStream pathData;
- if (StringUtils.isEmpty(pathURI.getScheme())) {
+ if (isEmpty(pathURI.getScheme())) {
// default to local file system
pathData = new FileInputStream(planFileName);
} else {
@@ -821,11 +839,11 @@
try {
// see also - code in CliDriver.java
ClassLoader loader = conf.getClassLoader();
- if (StringUtils.isNotBlank(auxJars)) {
- loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","));
+ if (isNotBlank(auxJars)) {
+ loader = Utilities.addToClassPath(loader, split(auxJars, ","));
}
- if (StringUtils.isNotBlank(addedJars)) {
- loader = Utilities.addToClassPath(loader, StringUtils.split(addedJars, ","));
+ if (isNotBlank(addedJars)) {
+ loader = Utilities.addToClassPath(loader, split(addedJars, ","));
}
conf.setClassLoader(loader);
// Also set this to the Thread ContextClassLoader, so new threads will inherit
@@ -984,48 +1002,53 @@
return numEmptyPaths;
}
+
private void addInputPaths(JobConf job, mapredWork work, String hiveScratchDir) throws Exception {
int numEmptyPaths = 0;
List pathsProcessed = new ArrayList();
// AliasToWork contains all the aliases
- for (String oneAlias : work.getAliasToWork().keySet()) {
- LOG.info("Processing alias " + oneAlias);
- List emptyPaths = new ArrayList();
+ for (String opAlias : work.getOperatorAliases()) {
+ LOG.info("Processing alias " + opAlias);
+ List emptyPaths = new ArrayList();
- // The alias may not have any path
- String path = null;
- for (String onefile : work.getPathToAliases().keySet()) {
- List aliases = work.getPathToAliases().get(onefile);
- if (aliases.contains(oneAlias)) {
- path = onefile;
+ // There may not be any path associated with the operator
+ boolean hasPath = false;
+ for (String path : work.getPaths()) {
+ if (work.getAliasesForPath(path).contains(opAlias)) {
+ hasPath = true;
// Multiple aliases can point to the same path - it should be processed only once
- if (pathsProcessed.contains(path))
+ if (pathsProcessed.contains(path)) {
continue;
+ }
+
pathsProcessed.add(path);
- LOG.info("Adding input file " + path);
-
- if (!isEmptyPath(job, path))
+ LOG.info("Adding input directory " + path);
+
+ if (!isEmptyPath(job, path)) {
FileInputFormat.addInputPaths(job, path);
- else
+ } else {
emptyPaths.add(path);
+ }
}
}
// Create a empty file if the directory is empty
- for (String emptyPath : emptyPaths)
- numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true, oneAlias);
+ for (String emptyPath : emptyPaths) {
+ numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true, opAlias);
+ }
// If the query references non-existent partitions
// We need to add a empty file, it is not acceptable to change the operator tree
// Consider the query:
// select * from (select count(1) from T union all select count(1) from T2) x;
// If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 rows)
- if (path == null)
- numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, oneAlias);
+ if (!hasPath) {
+ numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, opAlias);
+ }
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy)
@@ -1218,6 +1218,10 @@
tbl.setProperty("comment", crtTbl.getComment());
if (crtTbl.getLocation() != null)
tblStorDesc.setLocation(crtTbl.getLocation());
+
+ if (crtTbl.isExternal()) {
+ tblStorDesc.setFilenameFilter(crtTbl.getFilenameFilter());
+ }
tbl.setInputFormatClass(crtTbl.getInputFormat());
tbl.setOutputFormatClass(crtTbl.getOutputFormat());
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/createTableDesc.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/createTableDesc.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/createTableDesc.java (working copy)
@@ -46,6 +46,7 @@
String inputFormat;
String outputFormat;
String location;
+ String filenameFilter;
String serName;
Map mapProp;
boolean ifNotExists;
@@ -58,7 +59,8 @@
String collItemDelim,
String mapKeyDelim, String lineDelim,
String comment, String inputFormat, String outputFormat,
- String location, String serName, Map mapProp,
+ String location, String filenameFilter,
+ String serName, Map mapProp,
boolean ifNotExists) {
this.tableName = tableName;
this.isExternal = isExternal;
@@ -73,6 +75,7 @@
this.outputFormat = outputFormat;
this.lineDelim = lineDelim;
this.location = location;
+ this.filenameFilter = filenameFilter;
this.mapKeyDelim = mapKeyDelim;
this.numBuckets = numBuckets;
this.partCols = partCols;
@@ -224,6 +227,15 @@
this.location = location;
}
+ @explain(displayName="filenameFilter")
+ public String getFilenameFilter() {
+ return filenameFilter;
+ }
+
+ public void setFilenameFilter(String filenameFilter) {
+ this.filenameFilter = filenameFilter;
+ }
+
@explain(displayName="isExternal")
public boolean isExternal() {
return isExternal;
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/mapredWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/mapredWork.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/mapredWork.java (working copy)
@@ -18,12 +18,21 @@
package org.apache.hadoop.hive.ql.plan;
-import java.util.*;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import static java.util.Collections.emptySet;
+
@explain(displayName="Map Reduce")
public class mapredWork implements Serializable {
private static final long serialVersionUID = 1L;
@@ -31,11 +40,13 @@
// map side work
// use LinkedHashMap to make sure the iteration order is
// deterministic, to ease testing
- private LinkedHashMap> pathToAliases;
+ private LinkedHashMap> pathToAliases;
- private LinkedHashMap pathToPartitionInfo;
+ private LinkedHashMap aliasToFilenameFilter;
+
+ private LinkedHashMap pathToPartitionInfo;
- private LinkedHashMap> aliasToWork;
+ private LinkedHashMap> aliasToWork;
private LinkedHashMap aliasToPartnInfo;
@@ -55,6 +66,7 @@
public mapredWork() {
this.aliasToPartnInfo = new LinkedHashMap();
+ this.aliasToFilenameFilter = new LinkedHashMap();
}
public mapredWork(
@@ -77,6 +89,7 @@
this.numReduceTasks = numReduceTasks;
this.mapLocalWork = mapLocalWork;
this.aliasToPartnInfo = new LinkedHashMap();
+ this.aliasToFilenameFilter = new LinkedHashMap();
}
public String getCommand() {
@@ -93,7 +106,50 @@
public void setPathToAliases(final LinkedHashMap> pathToAliases) {
this.pathToAliases = pathToAliases;
}
+
+ public Set getPaths() {
+ return pathToAliases.keySet();
+ }
+
+ public void setAliasToFilenameFilter(LinkedHashMap aliasToFilenameFilter) {
+ this.aliasToFilenameFilter = aliasToFilenameFilter;
+ }
+
+ public LinkedHashMap getAliasToFilenameFilter() {
+ return aliasToFilenameFilter;
+ }
+
+ public Set getAliasesForPath(String path) {
+ List aliases = pathToAliases.get(path);
+ if (null != aliases) {
+ return new HashSet(aliases);
+ } else {
+ return emptySet();
+ }
+ }
+
+ public Set getAliasesForPath(Path path) {
+ return getAliasesForPath(path.toString());
+ }
+
+ public boolean containsPath(Path path) {
+ return containsPath(path.toUri().getPath());
+ }
+
+ public boolean containsPath(String path) {
+ return getPaths().contains(path);
+ }
+
+ public String getFileNameFilterForAlias(String alias) {
+ return aliasToFilenameFilter.get(alias);
+ }
+
+
+ public void setFilenameFilterForAlias(String alias, String filter) {
+ aliasToFilenameFilter.put(alias, filter);
+ }
+
@explain(displayName="Path -> Partition", normalExplain=false)
public LinkedHashMap getPathToPartitionInfo() {
return this.pathToPartitionInfo;
@@ -127,6 +183,18 @@
}
+ public Set getAliases() {
+ return aliasToWork.keySet();
+ }
+
+ public Set getOperatorAliases() {
+ return aliasToWork.keySet();
+ }
+
+ public Operator extends Serializable> getOperatorForAlias(String alias) {
+ return aliasToWork.get(alias);
+ }
+
/**
* @return the mapredLocalWork
*/
Index: ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/plan/fetchWork.java (working copy)
@@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.plan.tableDesc;
@@ -29,13 +30,17 @@
public class fetchWork implements Serializable {
private static final long serialVersionUID = 1L;
+ private static final int NO_LIMIT = -1;
+
private String tblDir;
private tableDesc tblDesc;
+
+ private String filenameFilter;
private List partDir;
private List partDesc;
- private int limit;
+ private int limit = NO_LIMIT;
/**
* Serialization Null Format for the serde used to fetch data
@@ -45,7 +50,7 @@
public fetchWork() { }
public fetchWork(String tblDir, tableDesc tblDesc) {
- this(tblDir, tblDesc, -1);
+ this(tblDir, tblDesc, NO_LIMIT);
}
public fetchWork(String tblDir, tableDesc tblDesc, int limit) {
@@ -55,7 +60,7 @@
}
public fetchWork(List partDir, List partDesc) {
- this(partDir, partDesc, -1);
+ this(partDir, partDesc, NO_LIMIT);
}
public fetchWork(List partDir, List partDesc, int limit) {
@@ -79,21 +84,30 @@
return tblDir;
}
- /**
+ /**
+ * @param tblDir the tblDir to set
+ */
+ public void setTblDir(String tblDir) {
+ this.tblDir = tblDir;
+ }
+
+ /**
* @return the tblDir
*/
public Path getTblDirPath() {
return new Path(tblDir);
}
- /**
- * @param tblDir the tblDir to set
- */
- public void setTblDir(String tblDir) {
- this.tblDir = tblDir;
- }
-
- /**
+
+ public void setFilenameFilter(String filenameFilter) {
+ this.filenameFilter = filenameFilter;
+ }
+
+ public String getFilenameFilter() {
+ return filenameFilter;
+ }
+
+ /**
* @return the tblDesc
*/
public tableDesc getTblDesc() {
@@ -111,10 +125,16 @@
* @return the partDir
*/
public List getPartDir() {
- return partDir;
+ return new ArrayList(partDir);
}
-
+ /**
+ * @param partDir the partDir to set
+ */
+ public void setPartDir(List partDir) {
+ this.partDir = new ArrayList(partDir);
+ }
+
public List getPartDirPath() {
return fetchWork.convertStringToPathArray(partDir);
}
@@ -141,12 +161,6 @@
return pathsStr;
}
- /**
- * @param partDir the partDir to set
- */
- public void setPartDir(List partDir) {
- this.partDir = partDir;
- }
/**
* @return the partDesc
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (working copy)
@@ -121,7 +121,9 @@
LATERAL_VIEW_WITH_JOIN("Join with a lateral view is not supported"),
LATERAL_VIEW_INVALID_CHILD("Lateral view AST with invalid child"),
OUTPUT_SPECIFIED_MULTIPLE_TIMES("The same output cannot be present multiple times: "),
- INVALID_AS("AS clause has an invalid number of aliases");
+ INVALID_AS("AS clause has an invalid number of aliases"),
+ INVALID_FILENAME_FILTER_CLAUSE("Filename filters are only applicable to EXTERNAL tables");
+
private String mesg;
private String SQLState;
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (working copy)
@@ -556,7 +556,7 @@
@init { msgs.push("table location specification"); }
@after { msgs.pop(); }
:
- KW_LOCATION locn=StringLiteral -> ^(TOK_TABLELOCATION $locn)
+ KW_LOCATION locn=StringLiteral (KW_FILENAMEFILTER filt=StringLiteral)? -> ^(TOK_TABLELOCATION $locn $filt?)
;
columnNameTypeList
@@ -1518,6 +1518,7 @@
KW_RECORDWRITER: 'RECORDWRITER';
KW_SEMI: 'SEMI';
KW_LATERAL: 'LATERAL';
+KW_FILENAMEFILTER: 'FILENAME_FILTER';
// Operators
// NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 899787)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -4865,7 +4865,11 @@
Table tab = ((Map.Entry)iter.next()).getValue();
if (!tab.isPartitioned()) {
if (qbParseInfo.getDestToWhereExpr().isEmpty()) {
- fetch = new fetchWork(tab.getPath().toString(), Utilities.getTableDesc(tab), qb.getParseInfo().getOuterQueryLimit());
+ fetch = new fetchWork();
+ fetch.setTblDir(tab.getPath().toString());
+ fetch.setTblDesc(Utilities.getTableDesc(tab));
+ fetch.setFilenameFilter(tab.getFilenameFilter());
+ fetch.setLimit(qb.getParseInfo().getOuterQueryLimit());
noMapRed = true;
inputs.add(new ReadEntity(tab));
}
@@ -4905,7 +4909,10 @@
inputs.add(new ReadEntity(part));
}
- fetch = new fetchWork(listP, partP, qb.getParseInfo().getOuterQueryLimit());
+ fetch = new fetchWork();
+ fetch.setPartDir(listP);
+ fetch.setPartDesc(partP);
+ fetch.setLimit(qb.getParseInfo().getOuterQueryLimit());
noMapRed = true;
}
}
@@ -4926,17 +4933,22 @@
if (qb.getIsQuery()) {
if ((!loadTableWork.isEmpty()) || (loadFileWork.size() != 1))
throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg());
- String cols = loadFileWork.get(0).getColumns();
- String colTypes = loadFileWork.get(0).getColumnTypes();
+ loadFileDesc loadDesc = loadFileWork.get(0);
+
+ String cols = loadDesc.getColumns();
+ String colTypes = loadDesc.getColumnTypes();
- fetch = new fetchWork(new Path(loadFileWork.get(0).getSourceDir()).toString(),
- new tableDesc(LazySimpleSerDe.class, TextInputFormat.class,
- IgnoreKeyTextOutputFormat.class,
- Utilities.makeProperties(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
- org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
- org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)),
- qb.getParseInfo().getOuterQueryLimit());
+ fetch = new fetchWork();
+ fetch.setTblDir(new Path(loadDesc.getSourceDir()).toString());
+ fetch.setTblDesc(new tableDesc(LazySimpleSerDe.class,
+ TextInputFormat.class, IgnoreKeyTextOutputFormat.class, Utilities
+ .makeProperties(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT,
+ "" + Utilities.ctrlaCode,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
+ org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
+ colTypes)));
+ fetch.setLimit(qb.getParseInfo().getOuterQueryLimit());
fetchTask = TaskFactory.get(fetch, this.conf);
setFetchTask(fetchTask);
@@ -5377,6 +5389,7 @@
String inputFormat = TEXTFILE_INPUT;
String outputFormat = TEXTFILE_OUTPUT;
String location = null;
+ String filter = null;
String serde = null;
Map mapProp = null;
boolean ifNotExists = false;
@@ -5527,6 +5540,9 @@
break;
case HiveParser.TOK_TABLELOCATION:
location = unescapeSQLString(child.getChild(0).getText());
+ if (child.getChildCount() == 2) {
+ filter = unescapeSQLString(child.getChild(1).getText());
+ }
break;
default: assert false;
}
@@ -5554,7 +5570,7 @@
sortCols, numBuckets,
fieldDelim, fieldEscape,
collItemDelim, mapKeyDelim, lineDelim,
- comment, inputFormat, outputFormat, location, serde,
+ comment, inputFormat, outputFormat, location, filter, serde,
mapProp, ifNotExists);
validateCreateTable(crtTblDesc);
@@ -5586,7 +5602,7 @@
sortCols, numBuckets,
fieldDelim, fieldEscape,
collItemDelim, mapKeyDelim, lineDelim,
- comment, inputFormat, outputFormat, location, serde,
+ comment, inputFormat, outputFormat, location, filter, serde,
mapProp, ifNotExists);
qb.setTableDesc(crtTblDesc);
@@ -5597,10 +5613,14 @@
}
private void validateCreateTable(createTableDesc crtTblDesc) throws SemanticException {
+
+ if (!crtTblDesc.isExternal() && null != crtTblDesc.getFilenameFilter()) {
+ throw new SemanticException(ErrorMsg.INVALID_FILENAME_FILTER_CLAUSE.getMsg());
+ }
+
// no duplicate column names
// currently, it is a simple n*n algorithm - this can be optimized later if need be
// but it should not be a major bottleneck as the number of columns are anyway not so big
-
if((crtTblDesc.getCols() == null) || (crtTblDesc.getCols().size() == 0)) {
// for now make sure that serde exists
if(StringUtils.isEmpty(crtTblDesc.getSerName()) || SerDeUtils.isNativeSerDe(crtTblDesc.getSerName())) {