diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2ddb08f..768cf22 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -223,6 +223,9 @@ // ignore the mapjoin hint HIVEIGNOREMAPJOINHINT("hive.ignore.mapjoin.hint", true), + // Max number of lines of footer user can set for a table file. + HIVE_FILE_MAX_FOOTER("hive.file.max.footer", 100), + // Hadoop Configuration Properties // Properties with null values are ignored and exist only for the purpose of giving us // a symbolic name to reference in the Hive source code. Properties with non-null diff --git conf/hive-default.xml.template conf/hive-default.xml.template index b94013a..f973bd4 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -386,6 +386,12 @@ + hive.file.max.footer + 100 + maximum number of lines for footer user can define for a table file + + + hive.map.aggr true Whether to use map-side aggregation in Hive Group By queries diff --git data/files/header_footer_table_1/0001.txt data/files/header_footer_table_1/0001.txt new file mode 100644 index 0000000..c242b42 --- /dev/null +++ data/files/header_footer_table_1/0001.txt @@ -0,0 +1,8 @@ +name message 0 +steven hive 1 +dave oozie 2 +xifa phd 3 +chuan hadoop 4 +shanyu senior 5 +footer1 footer1 0 +footer2 0 \ No newline at end of file diff --git data/files/header_footer_table_1/0002.txt data/files/header_footer_table_1/0002.txt new file mode 100644 index 0000000..d5db38d --- /dev/null +++ data/files/header_footer_table_1/0002.txt @@ -0,0 +1,8 @@ +name message 0 +steven2 hive 11 +dave2 oozie 12 +xifa2 phd 13 +chuan2 hadoop 14 +shanyu2 senior 15 +footer1 footer1 0 +footer2 0 \ No newline at end of file diff --git data/files/header_footer_table_1/0003.txt data/files/header_footer_table_1/0003.txt new file mode 100644 index 0000000..f7a763d --- /dev/null +++ data/files/header_footer_table_1/0003.txt @@ -0,0 +1,4 @@ +name message 0 +david3 oozie 22 +footer1 footer1 0 +footer2 0 \ No newline at end of file diff --git data/files/header_footer_table_2/2012/01/01/0001.txt data/files/header_footer_table_2/2012/01/01/0001.txt new file mode 100644 index 0000000..c242b42 --- /dev/null +++ data/files/header_footer_table_2/2012/01/01/0001.txt @@ -0,0 +1,8 @@ +name message 0 +steven hive 1 +dave oozie 2 +xifa phd 3 +chuan hadoop 4 +shanyu senior 5 +footer1 footer1 0 +footer2 0 \ No newline at end of file diff --git data/files/header_footer_table_2/2012/01/02/0002.txt data/files/header_footer_table_2/2012/01/02/0002.txt new file mode 100644 index 0000000..d5db38d --- /dev/null +++ data/files/header_footer_table_2/2012/01/02/0002.txt @@ -0,0 +1,8 @@ +name message 0 +steven2 hive 11 +dave2 oozie 12 +xifa2 phd 13 +chuan2 hadoop 14 +shanyu2 senior 15 +footer1 footer1 0 +footer2 0 \ No newline at end of file diff --git data/files/header_footer_table_2/2012/01/03/0003.txt data/files/header_footer_table_2/2012/01/03/0003.txt new file mode 100644 index 0000000..f7a763d --- /dev/null +++ data/files/header_footer_table_2/2012/01/03/0003.txt @@ -0,0 +1,4 @@ +name message 0 +david3 oozie 22 +footer1 footer1 0 +footer2 0 \ No newline at end of file diff --git data/files/header_footer_table_3/empty1.txt data/files/header_footer_table_3/empty1.txt new file mode 100644 index 0000000..e69de29 diff --git data/files/header_footer_table_3/empty2.txt data/files/header_footer_table_3/empty2.txt new file mode 100644 index 0000000..e69de29 diff --git itests/qtest/pom.xml itests/qtest/pom.xml index 88e0890..c45c176 100644 --- itests/qtest/pom.xml +++ itests/qtest/pom.xml @@ -36,8 +36,8 @@ false false - stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q - cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q + stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q + cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rename.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_overwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 03fd30a..3a58602 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -41,6 +41,7 @@ import java.util.Deque; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -78,6 +79,9 @@ import org.apache.hadoop.hive.ql.parse.ParseException; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer; @@ -242,6 +246,14 @@ public void initConf() throws Exception { conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); + TableDesc tblDesc = Utilities.defaultTd; + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap(); + pt.put("/tmp/testfolder", partDesc); + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + Utilities.setMapRedWork(conf, mrwork,"/tmp/" + System.getProperty("user.name") + "/hive"); + if (miniMr) { assert dfs != null; assert mr != null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index fc9b7e4..339008a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -34,8 +34,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.FooterBuffer; import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveRecordReader; @@ -46,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory; @@ -80,6 +83,9 @@ private PartitionDesc currPart; private TableDesc currTbl; private boolean tblDataDone; + private FooterBuffer footerBuffer = null; + private int headerCount = 0; + private int footerCount = 0; private boolean hasVC; private boolean isPartitioned; @@ -527,6 +533,7 @@ protected void pushRow(InspectableObject row) throws HiveException { public InspectableObject getNextRow() throws IOException { try { while (true) { + boolean opNotEOF = true; if (context != null) { context.resetRow(); } @@ -535,10 +542,49 @@ public InspectableObject getNextRow() throws IOException { if (currRecReader == null) { return null; } + + /** + * Start reading a new file. + * If file contains header, skip header lines before reading the records. + * If file contains footer, used FooterBuffer to cache and remove footer + * records at the end of the file. + */ + headerCount = 0; + footerCount = 0; + TableDesc table = null; + if (currTbl != null) { + table = currTbl; + } else if (currPart != null) { + table = currPart.getTableDesc(); + } + if (table != null) { + headerCount = Utilities.getHeaderCount(table); + footerCount = Utilities.getFooterCount(table, job); + } + + // Skip header lines. + opNotEOF = Utilities.skipHeader(currRecReader, headerCount, key, value); + + // Initialize footer buffer. + if (opNotEOF) { + if (footerCount > 0) { + footerBuffer = new FooterBuffer(); + opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value); + } + } } - boolean ret = currRecReader.next(key, value); - if (ret) { + if (opNotEOF && footerBuffer == null) { + /** + * When file doesn't end after skipping header line + * and there is no footer lines, read normally. + */ + opNotEOF = currRecReader.next(key, value); + } + if (opNotEOF && footerBuffer != null) { + opNotEOF = footerBuffer.updateBuffer(job, currRecReader, key, value); + } + if (opNotEOF) { if (operator != null && context != null && context.inputFileChanged()) { // The child operators cleanup if input file has changed try { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java new file mode 100644 index 0000000..932dccf --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java @@ -0,0 +1,92 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.ReflectionUtils; + +public class FooterBuffer { + private ArrayList buffer; + private int cur; + + public FooterBuffer() { + } + + public void setCursor(int cur) { + this.cur = cur; + } + + /** + * Initialize footer buffer in order to keep footer records at the end of file. + * + * @param job + * Current job configuration. + * + * @param recordreader + * Record reader. + * + * @param footerCount + * Footer line number of the table files. + * + * @param key + * Key of current reading record. + * + * @param value + * Value of current reading record. + * + * @return Return true if there are 0 or more records left in the file + * after initializing the footer buffer, otherwise return false. + */ + public boolean initializeBuffer(JobConf job, RecordReader recordreader, + int footerCount, WritableComparable key, Writable value) throws IOException { + + // Fill the buffer with key value pairs. + this.buffer = new ArrayList(); + while (buffer.size() < footerCount) { + boolean notEOF = recordreader.next(key, value); + if (!notEOF) { + return false; + } + ObjectPair tem = new ObjectPair(); + tem.setFirst(ReflectionUtils.copy(job, key, tem.getFirst())); + tem.setSecond(ReflectionUtils.copy(job, value, tem.getSecond())); + buffer.add(tem); + } + this.cur = 0; + return true; + } + + /** + * Enqueue most recent record read, and dequeue earliest result in the queue. + * + * @param job + * Current job configuration. + * + * @param recordreader + * Record reader. + * + * @param key + * Key of current reading record. + * + * @param value + * Value of current reading record. + * + * @return Return false if reaches the end of file, otherwise return true. + */ + public boolean updateBuffer(JobConf job, RecordReader recordreader, + WritableComparable key, Writable value) throws IOException { + key = ReflectionUtils.copy(job, (WritableComparable)buffer.get(cur).getFirst(), key); + value = ReflectionUtils.copy(job, (Writable)buffer.get(cur).getSecond(), value); + boolean notEOF = recordreader.next(buffer.get(cur).getFirst(), buffer.get(cur).getSecond()); + if (notEOF) { + cur = (++cur) % buffer.size(); + } + return notEOF; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index daf4e4a..3cd57b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -96,6 +96,7 @@ import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -160,12 +161,14 @@ import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -3113,5 +3116,74 @@ public static File createTempDir(String baseDir){ + baseDir + " Giving up after " + MAX_ATTEMPS + " attemps"); } + + /** + * Skip header lines in the table file when reading the record. + * + * @param currRecReader + * Record reader. + * + * @param headerCount + * Header line number of the table files. + * + * @param key + * Key of current reading record. + * + * @param value + * Value of current reading record. + * + * @return Return true if there are 0 or more records left in the file + * after skipping all headers, otherwise return false. + */ + public static boolean skipHeader(RecordReader currRecReader, + int headerCount, WritableComparable key, Writable value) throws IOException { + while (headerCount > 0) { + if (!currRecReader.next(key, value)) + return false; + headerCount--; + } + return true; + } + + /** + * Get header line count for a table. + * + * @param table + * Table description for target table. + * + */ + public static int getHeaderCount(TableDesc table) throws IOException { + int headerCount; + try { + headerCount = Integer.parseInt(table.getProperties().getProperty(serdeConstants.HEADER_COUNT, "0")); + } catch (NumberFormatException nfe) { + throw new IOException(nfe); + } + return headerCount; + } + + /** + * Get footer line count for a table. + * + * @param table + * Table description for target table. + * + * @param job + * Job configuration for current job. + */ + public static int getFooterCount(TableDesc table, JobConf job) throws IOException { + int footerCount; + try { + footerCount = Integer.parseInt(table.getProperties().getProperty(serdeConstants.FOOTER_COUNT, "0")); + if (footerCount > HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_FILE_MAX_FOOTER)) { + throw new IOException("footer number exceeds the limit defined in hive.file.max.footer"); + } + } catch (NumberFormatException nfe) { + + // Footer line number must be set as an integer. + throw new IOException(nfe); + } + return footerCount; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index dd5cb6b..9e6d3bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -20,24 +20,38 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.FooterBuffer; import org.apache.hadoop.hive.ql.io.IOContext.Comparison; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.ReflectionUtils; /** This class prepares an IOContext, and provides the ability to perform a binary search on the * data. The binary search can be used by setting the value of inputFormatSorted in the @@ -217,6 +231,10 @@ public float getProgress() throws IOException { } } + private FooterBuffer footerBuffer = null; + private int headerCount = 0; + private int footerCount = 0; + public boolean doNext(K key, V value) throws IOException { if (this.isSorted) { if (this.getIOContext().shouldEndBinarySearch() || @@ -271,7 +289,53 @@ public boolean doNext(K key, V value) throws IOException { } try { - return recordReader.next(key, value); + + /** + * When start reading new file, check header, footer rows. + * If file contains header, skip header lines before reading the records. + * If file contains footer, used a FooterBuffer to remove footer lines + * at the end of the table file. + **/ + if (this.ioCxtRef.getCurrentBlockStart() == 0) { + + // Check if the table file has header to skip. + Path filePath = this.ioCxtRef.getInputPath(); + PartitionDesc part = null; + try { + Map pathToPartitionInfo = Utilities + .getMapWork(jobConf).getPathToPartitionInfo(); + part = HiveFileFormatUtils + .getPartitionDescFromPathRecursively(pathToPartitionInfo, + filePath, IOPrepareCache.get().getPartitionDescMap()); + } catch (Exception e) { + LOG.info("Cannot get partition description from " + this.ioCxtRef.getInputPath() + + "because " + e.getMessage()); + part = null; + } + TableDesc table = (part == null) ? null : part.getTableDesc(); + if (table != null) { + headerCount = Utilities.getHeaderCount(table); + footerCount = Utilities.getFooterCount(table, jobConf); + } + + // If input contains header, skip header. + if (!Utilities.skipHeader(recordReader, headerCount, (WritableComparable)key, (Writable)value)) { + return false; + } + if (footerCount > 0) { + footerBuffer = new FooterBuffer(); + if (!footerBuffer.initializeBuffer(jobConf, recordReader, footerCount, (WritableComparable)key, (Writable)value)) { + return false; + } + } + } + if (footerBuffer == null) { + + // Table files don't have footer rows. + return recordReader.next(key, value); + } else { + return footerBuffer.updateBuffer(jobConf, recordReader, (WritableComparable)key, (Writable)value); + } } catch (Exception e) { return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 974a5d6..729ed13 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Writable; @@ -290,6 +292,18 @@ protected void init(JobConf job) { FileInputFormat.setInputPaths(newjob, dir); newjob.setInputFormat(inputFormat.getClass()); + TableDesc tableDesc = part.getTableDesc(); + int headerCount = 0; + int footerCount = 0; + if (tableDesc != null) { + headerCount = Utilities.getHeaderCount(tableDesc); + footerCount = Utilities.getFooterCount(tableDesc, newjob); + if (headerCount != 0 || footerCount != 0) { + + // Input file has header or footer, cannot be splitted. + newjob.setLong("mapred.min.split.size", Long.MAX_VALUE); + } + } InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length); for (InputSplit is : iss) { result.add(new HiveInputSplit(is, inputFormatClass.getName())); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java index 85dd975..1047a8d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java @@ -24,13 +24,19 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.LinkedHashMap; import junit.framework.Assert; import junit.framework.TestCase; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; @@ -127,6 +133,15 @@ private void init() throws IOException { when(rcfReader.getPos()).thenReturn(50L); conf = new JobConf(); conf.setBoolean("hive.input.format.sorted", true); + + TableDesc tblDesc = Utilities.defaultTd; + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap(); + pt.put("/tmp/testfolder", partDesc); + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + Utilities.setMapRedWork(conf, mrwork,"/tmp/" + System.getProperty("user.name") + "/hive"); + hiveSplit = new TestHiveInputSplit(); hbsReader = new TestHiveRecordReader(rcfReader, conf); hbsReader.initIOContext(hiveSplit, conf, Class.class, rcfReader); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java index 0686d9b..94061af 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import junit.framework.TestCase; @@ -38,6 +39,9 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -70,6 +74,15 @@ protected void setUp() throws IOException { conf = new Configuration(); job = new JobConf(conf); + + TableDesc tblDesc = Utilities.defaultTd; + PartitionDesc partDesc = new PartitionDesc(tblDesc, null); + LinkedHashMap pt = new LinkedHashMap(); + pt.put("/tmp/testfolder", partDesc); + MapredWork mrwork = new MapredWork(); + mrwork.getMapWork().setPathToPartitionInfo(pt); + Utilities.setMapRedWork(job, mrwork,"/tmp/" + System.getProperty("user.name") + "/hive"); + fileSystem = FileSystem.getLocal(conf); testDir = new Path(System.getProperty("test.tmp.dir", System.getProperty( "user.dir", new File(".").getAbsolutePath())) diff --git ql/src/test/queries/clientnegative/file_with_header_footer_negative.q ql/src/test/queries/clientnegative/file_with_header_footer_negative.q new file mode 100644 index 0000000..fdccfd9 --- /dev/null +++ ql/src/test/queries/clientnegative/file_with_header_footer_negative.q @@ -0,0 +1,13 @@ +dfs ${system:test.dfs.mkdir} hdfs:///tmp/test/; + +dfs -copyFromLocal ../data/files/header_footer_table_1 hdfs:///tmp/test/header_footer_table_1; + +dfs -copyFromLocal ../data/files/header_footer_table_2 hdfs:///tmp/test/header_footer_table_2; + +CREATE EXTERNAL TABLE header_footer_table_1 (name string, message string, id int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs:///tmp/test/header_footer_table_1' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="200"); + +SELECT * FROM header_footer_table_1; + +DROP TABLE header_footer_table_1; + +dfs -rmr hdfs:///tmp/test; \ No newline at end of file diff --git ql/src/test/queries/clientpositive/file_with_header_footer.q ql/src/test/queries/clientpositive/file_with_header_footer.q new file mode 100644 index 0000000..8b65c78 --- /dev/null +++ ql/src/test/queries/clientpositive/file_with_header_footer.q @@ -0,0 +1,39 @@ +dfs ${system:test.dfs.mkdir} hdfs:///tmp/test/; + +dfs -copyFromLocal ../../data/files/header_footer_table_1 hdfs:///tmp/test/header_footer_table_1; + +dfs -copyFromLocal ../../data/files/header_footer_table_2 hdfs:///tmp/test/header_footer_table_2; + +dfs -copyFromLocal ../../data/files/header_footer_table_3 hdfs:///tmp/test/header_footer_table_3; + +CREATE EXTERNAL TABLE header_footer_table_1 (name string, message string, id int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs:///tmp/test/header_footer_table_1' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2"); + +SELECT * FROM header_footer_table_1; + +SELECT * FROM header_footer_table_1 WHERE id < 50; + +CREATE EXTERNAL TABLE header_footer_table_2 (name string, message string, id int) PARTITIONED BY (year int, month int, day int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2"); + +ALTER TABLE header_footer_table_2 ADD PARTITION (year=2012, month=1, day=1) location 'hdfs:///tmp/test/header_footer_table_2/2012/01/01'; + +ALTER TABLE header_footer_table_2 ADD PARTITION (year=2012, month=1, day=2) location 'hdfs:///tmp/test/header_footer_table_2/2012/01/02'; + +ALTER TABLE header_footer_table_2 ADD PARTITION (year=2012, month=1, day=3) location 'hdfs:///tmp/test/header_footer_table_2/2012/01/03'; + +SELECT * FROM header_footer_table_2; + +SELECT * FROM header_footer_table_2 WHERE id < 50; + +CREATE EXTERNAL TABLE emptytable (name string, message string, id int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs:///tmp/test/header_footer_table_3' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2"); + +SELECT * FROM emptytable; + +SELECT * FROM emptytable WHERE id < 50; + +DROP TABLE header_footer_table_1; + +DROP TABLE header_footer_table_2; + +DROP TABLE emptytable; + +dfs -rmr hdfs:///tmp/test; \ No newline at end of file diff --git ql/src/test/results/clientnegative/file_with_header_footer_negative.q.out ql/src/test/results/clientnegative/file_with_header_footer_negative.q.out new file mode 100644 index 0000000..fa261b3 --- /dev/null +++ ql/src/test/results/clientnegative/file_with_header_footer_negative.q.out @@ -0,0 +1,14 @@ +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@header_footer_table_1 +PREHOOK: query: SELECT * FROM header_footer_table_1 +PREHOOK: type: QUERY +PREHOOK: Input: default@header_footer_table_1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM header_footer_table_1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@header_footer_table_1 +#### A masked pattern was here #### +Failed with exception java.io.IOException:java.io.IOException: footer number exceeds the limit defined in hive.file.max.footer diff --git ql/src/test/results/clientpositive/file_with_header_footer.q.out ql/src/test/results/clientpositive/file_with_header_footer.q.out new file mode 100644 index 0000000..857ec52 --- /dev/null +++ ql/src/test/results/clientpositive/file_with_header_footer.q.out @@ -0,0 +1,165 @@ +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@header_footer_table_1 +PREHOOK: query: SELECT * FROM header_footer_table_1 +PREHOOK: type: QUERY +PREHOOK: Input: default@header_footer_table_1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM header_footer_table_1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@header_footer_table_1 +#### A masked pattern was here #### +steven hive 1 +dave oozie 2 +xifa phd 3 +chuan hadoop 4 +shanyu senior 5 +steven2 hive 11 +dave2 oozie 12 +xifa2 phd 13 +chuan2 hadoop 14 +shanyu2 senior 15 +david3 oozie 22 +PREHOOK: query: SELECT * FROM header_footer_table_1 WHERE id < 50 +PREHOOK: type: QUERY +PREHOOK: Input: default@header_footer_table_1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM header_footer_table_1 WHERE id < 50 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@header_footer_table_1 +#### A masked pattern was here #### +steven hive 1 +dave oozie 2 +xifa phd 3 +chuan hadoop 4 +shanyu senior 5 +steven2 hive 11 +dave2 oozie 12 +xifa2 phd 13 +chuan2 hadoop 14 +shanyu2 senior 15 +david3 oozie 22 +PREHOOK: query: CREATE EXTERNAL TABLE header_footer_table_2 (name string, message string, id int) PARTITIONED BY (year int, month int, day int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2") +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE EXTERNAL TABLE header_footer_table_2 (name string, message string, id int) PARTITIONED BY (year int, month int, day int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@header_footer_table_2 +#### A masked pattern was here #### +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Input: default@header_footer_table_2 +#### A masked pattern was here #### +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Input: default@header_footer_table_2 +POSTHOOK: Output: default@header_footer_table_2@year=2012/month=1/day=1 +#### A masked pattern was here #### +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Input: default@header_footer_table_2 +#### A masked pattern was here #### +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Input: default@header_footer_table_2 +POSTHOOK: Output: default@header_footer_table_2@year=2012/month=1/day=2 +#### A masked pattern was here #### +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Input: default@header_footer_table_2 +#### A masked pattern was here #### +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Input: default@header_footer_table_2 +POSTHOOK: Output: default@header_footer_table_2@year=2012/month=1/day=3 +PREHOOK: query: SELECT * FROM header_footer_table_2 +PREHOOK: type: QUERY +PREHOOK: Input: default@header_footer_table_2 +PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=1 +PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=2 +PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=3 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM header_footer_table_2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@header_footer_table_2 +POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=1 +POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=2 +POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=3 +#### A masked pattern was here #### +steven hive 1 2012 1 1 +dave oozie 2 2012 1 1 +xifa phd 3 2012 1 1 +chuan hadoop 4 2012 1 1 +shanyu senior 5 2012 1 1 +steven2 hive 11 2012 1 2 +dave2 oozie 12 2012 1 2 +xifa2 phd 13 2012 1 2 +chuan2 hadoop 14 2012 1 2 +shanyu2 senior 15 2012 1 2 +david3 oozie 22 2012 1 3 +PREHOOK: query: SELECT * FROM header_footer_table_2 WHERE id < 50 +PREHOOK: type: QUERY +PREHOOK: Input: default@header_footer_table_2 +PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=1 +PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=2 +PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=3 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM header_footer_table_2 WHERE id < 50 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@header_footer_table_2 +POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=1 +POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=2 +POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=3 +#### A masked pattern was here #### +steven hive 1 2012 1 1 +dave oozie 2 2012 1 1 +xifa phd 3 2012 1 1 +chuan hadoop 4 2012 1 1 +shanyu senior 5 2012 1 1 +steven2 hive 11 2012 1 2 +dave2 oozie 12 2012 1 2 +xifa2 phd 13 2012 1 2 +chuan2 hadoop 14 2012 1 2 +shanyu2 senior 15 2012 1 2 +david3 oozie 22 2012 1 3 +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@emptytable +PREHOOK: query: SELECT * FROM emptytable +PREHOOK: type: QUERY +PREHOOK: Input: default@emptytable +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM emptytable +POSTHOOK: type: QUERY +POSTHOOK: Input: default@emptytable +#### A masked pattern was here #### +PREHOOK: query: SELECT * FROM emptytable WHERE id < 50 +PREHOOK: type: QUERY +PREHOOK: Input: default@emptytable +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM emptytable WHERE id < 50 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@emptytable +#### A masked pattern was here #### +PREHOOK: query: DROP TABLE header_footer_table_1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@header_footer_table_1 +PREHOOK: Output: default@header_footer_table_1 +POSTHOOK: query: DROP TABLE header_footer_table_1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@header_footer_table_1 +POSTHOOK: Output: default@header_footer_table_1 +PREHOOK: query: DROP TABLE header_footer_table_2 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@header_footer_table_2 +PREHOOK: Output: default@header_footer_table_2 +POSTHOOK: query: DROP TABLE header_footer_table_2 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@header_footer_table_2 +POSTHOOK: Output: default@header_footer_table_2 +PREHOOK: query: DROP TABLE emptytable +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@emptytable +PREHOOK: Output: default@emptytable +POSTHOOK: query: DROP TABLE emptytable +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@emptytable +POSTHOOK: Output: default@emptytable +#### A masked pattern was here #### diff --git serde/if/serde.thrift serde/if/serde.thrift index 2ceb572..31c87ee 100644 --- serde/if/serde.thrift +++ serde/if/serde.thrift @@ -37,6 +37,8 @@ const string LINE_DELIM = "line.delim" const string MAPKEY_DELIM = "mapkey.delim" const string QUOTE_CHAR = "quote.delim" const string ESCAPE_CHAR = "escape.delim" +const string HEADER_COUNT = "skip.header.line.count" +const string FOOTER_COUNT = "skip.footer.line.count" typedef string PrimitiveType typedef string CollectionType diff --git serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java index 22a6168..515cf25 100644 --- serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java +++ serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java @@ -61,6 +61,10 @@ public static final String ESCAPE_CHAR = "escape.delim"; + public static final String HEADER_COUNT = "skip.header.line.count"; + + public static final String FOOTER_COUNT = "skip.footer.line.count"; + public static final String VOID_TYPE_NAME = "void"; public static final String BOOLEAN_TYPE_NAME = "boolean";