diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index d8ac6ae..c54f405 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -117,10 +117,6 @@ private transient Writable value; private transient Object[] vcValues; - private transient int headerCount; - private transient int footerCount; - private transient FooterBuffer footerBuffer; - private transient StructObjectInspector outputOI; private transient Object[] row; @@ -347,7 +343,6 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException } key = currRecReader.createKey(); value = currRecReader.createValue(); - headerCount = footerCount = 0; return currRecReader; } @@ -447,7 +442,6 @@ protected void flushRow() throws HiveException { public InspectableObject getNextRow() throws IOException { try { while (true) { - boolean opNotEOF = true; if (context != null) { context.resetRow(); } @@ -456,37 +450,9 @@ public InspectableObject getNextRow() throws IOException { if (currRecReader == null) { return null; } - - /** - * Start reading a new file. - * If file contains header, skip header lines before reading the records. - * If file contains footer, used FooterBuffer to cache and remove footer - * records at the end of the file. - */ - headerCount = Utilities.getHeaderCount(currDesc.getTableDesc()); - footerCount = Utilities.getFooterCount(currDesc.getTableDesc(), job); - - // Skip header lines. - opNotEOF = Utilities.skipHeader(currRecReader, headerCount, key, value); - - // Initialize footer buffer. - if (opNotEOF && footerCount > 0) { - footerBuffer = new FooterBuffer(); - opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value); - } - } - - if (opNotEOF && footerBuffer == null) { - /** - * When file doesn't end after skipping header line - * and there is no footer lines, read normally. - */ - opNotEOF = currRecReader.next(key, value); - } - if (opNotEOF && footerBuffer != null) { - opNotEOF = footerBuffer.updateBuffer(job, currRecReader, key, value); } - if (opNotEOF) { + boolean ret = currRecReader.next(key, value); + if (ret) { if (operator != null && context != null && context.inputFileChanged()) { // The child operators cleanup if input file has changed operator.cleanUpInputFileChanged(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java deleted file mode 100644 index 6a407af..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.hadoop.hive.common.ObjectPair; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; - -public class FooterBuffer { - private ArrayList buffer; - private int cur; - - public FooterBuffer() { - } - - public void setCursor(int cur) { - this.cur = cur; - } - - /** - * Initialize footer buffer in order to keep footer records at the end of file. - * - * @param job - * Current job configuration. - * - * @param recordreader - * Record reader. - * - * @param footerCount - * Footer line number of the table files. - * - * @param key - * Key of current reading record. - * - * @param value - * Value of current reading record. - * - * @return Return true if there are 0 or more records left in the file - * after initializing the footer buffer, otherwise return false. - */ - public boolean initializeBuffer(JobConf job, RecordReader recordreader, - int footerCount, WritableComparable key, Writable value) throws IOException { - - // Fill the buffer with key value pairs. - this.buffer = new ArrayList(); - while (buffer.size() < footerCount) { - boolean notEOF = recordreader.next(key, value); - if (!notEOF) { - return false; - } - ObjectPair tem = new ObjectPair(); - tem.setFirst(ReflectionUtils.copy(job, key, tem.getFirst())); - tem.setSecond(ReflectionUtils.copy(job, value, tem.getSecond())); - buffer.add(tem); - } - this.cur = 0; - return true; - } - - /** - * Enqueue most recent record read, and dequeue earliest result in the queue. - * - * @param job - * Current job configuration. - * - * @param recordreader - * Record reader. - * - * @param key - * Key of current reading record. - * - * @param value - * Value of current reading record. - * - * @return Return false if reaches the end of file, otherwise return true. - */ - public boolean updateBuffer(JobConf job, RecordReader recordreader, - WritableComparable key, Writable value) throws IOException { - key = ReflectionUtils.copy(job, (WritableComparable)buffer.get(cur).getFirst(), key); - value = ReflectionUtils.copy(job, (Writable)buffer.get(cur).getSecond(), value); - boolean notEOF = recordreader.next(buffer.get(cur).getFirst(), buffer.get(cur).getSecond()); - if (notEOF) { - cur = (++cur) % buffer.size(); - } - return notEOF; - } - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index bdda89a..98a15b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -423,13 +423,13 @@ public int execute(DriverContext driverContext) { Map, Partition> dp = db.loadDynamicPartitions( tbd.getSourcePath(), - tbd.getTable().getTableName(), + tbd.getTable(), tbd.getPartitionSpec(), tbd.getReplace(), dpCtx.getNumDPCols(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask()); + SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(),this.queryState.getHiveOperation()); console.printInfo("\t Time taken to load dynamic partitions: " + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); @@ -485,10 +485,11 @@ public int execute(DriverContext driverContext) { List partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec()); db.validatePartitionNameCharacters(partVals); - db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), + db.loadPartition(tbd.getSourcePath(), tbd.getTable(), tbd.getPartitionSpec(), tbd.getReplace(), tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask()); + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, + hasFollowingStatsTask(), this.queryState.getHiveOperation()); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); if (bucketCols != null || sortCols != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 449bef8..0ba2fa5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -46,7 +46,6 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLTransientException; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collection; @@ -174,7 +173,6 @@ import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.mapred.FileInputFormat; @@ -183,7 +181,6 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -3348,34 +3345,6 @@ public static File createTempDir(String baseDir){ } /** - * Skip header lines in the table file when reading the record. - * - * @param currRecReader - * Record reader. - * - * @param headerCount - * Header line number of the table files. - * - * @param key - * Key of current reading record. - * - * @param value - * Value of current reading record. - * - * @return Return true if there are 0 or more records left in the file - * after skipping all headers, otherwise return false. - */ - public static boolean skipHeader(RecordReader currRecReader, - int headerCount, WritableComparable key, Writable value) throws IOException { - while (headerCount > 0) { - if (!currRecReader.next(key, value)) - return false; - headerCount--; - } - return true; - } - - /** * Get header line count for a table. * * @param table diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java index 4a05a62..1f0dba5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java @@ -27,20 +27,14 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.FooterBuffer; import org.apache.hadoop.hive.ql.io.IOContext.Comparison; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -240,9 +234,6 @@ public float getProgress() throws IOException { } } - private FooterBuffer footerBuffer = null; - private int headerCount = 0; - private int footerCount = 0; public boolean doNext(K key, V value) throws IOException { if (this.isSorted) { @@ -296,64 +287,7 @@ public boolean doNext(K key, V value) throws IOException { } } } - - try { - - /** - * When start reading new file, check header, footer rows. - * If file contains header, skip header lines before reading the records. - * If file contains footer, used a FooterBuffer to remove footer lines - * at the end of the table file. - **/ - if (this.ioCxtRef.getCurrentBlockStart() == 0) { - - // Check if the table file has header to skip. - Path filePath = this.ioCxtRef.getInputPath(); - PartitionDesc part = null; - try { - if (pathToPartitionInfo == null) { - pathToPartitionInfo = Utilities - .getMapWork(jobConf).getPathToPartitionInfo(); - } - part = HiveFileFormatUtils - .getPartitionDescFromPathRecursively(pathToPartitionInfo, - filePath, IOPrepareCache.get().getPartitionDescMap()); - } catch (AssertionError ae) { - LOG.info("Cannot get partition description from " + this.ioCxtRef.getInputPath() - + "because " + ae.getMessage()); - part = null; - } catch (Exception e) { - LOG.info("Cannot get partition description from " + this.ioCxtRef.getInputPath() - + "because " + e.getMessage()); - part = null; - } - TableDesc table = (part == null) ? null : part.getTableDesc(); - if (table != null) { - headerCount = Utilities.getHeaderCount(table); - footerCount = Utilities.getFooterCount(table, jobConf); - } - - // If input contains header, skip header. - if (!Utilities.skipHeader(recordReader, headerCount, (WritableComparable)key, (Writable)value)) { - return false; - } - if (footerCount > 0) { - footerBuffer = new FooterBuffer(); - if (!footerBuffer.initializeBuffer(jobConf, recordReader, footerCount, (WritableComparable)key, (Writable)value)) { - return false; - } - } - } - if (footerBuffer == null) { - - // Table files don't have footer rows. - return recordReader.next(key, value); - } else { - return footerBuffer.updateBuffer(jobConf, recordReader, (WritableComparable)key, (Writable)value); - } - } catch (Exception e) { - return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf); - } + return recordReader.next(key, value); } private void sync(long position) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 6cacc59..bfc5f51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -31,18 +31,8 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -54,10 +44,10 @@ import javax.jdo.JDODataStoreException; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.ReversedLinesFileReader; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -124,9 +114,7 @@ import org.apache.hadoop.hive.ql.index.HiveIndexHandler; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils; -import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; -import org.apache.hadoop.hive.ql.plan.DropTableDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.*; import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.Deserializer; @@ -135,6 +123,7 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.thrift.TException; @@ -157,6 +146,7 @@ public class Hive { static final private Logger LOG = LoggerFactory.getLogger("hive.ql.metadata.Hive"); + private static final int BUFFER_SIZE = 1048576; private HiveConf conf = null; private IMetaStoreClient metaStoreClient; @@ -1450,13 +1440,13 @@ public Database getDatabaseCurrent() throws HiveException { return getDatabase(currentDb); } - public void loadPartition(Path loadPath, String tableName, + public void loadPartition(Path loadPath, TableDesc tbd, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { - Table tbl = getTable(tableName); + boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, HiveOperation op) throws HiveException { + Table tbl = getTable(tbd.getTableName()); loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs, - isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask); + isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, tbd, op); } /** @@ -1478,12 +1468,18 @@ public void loadPartition(Path loadPath, String tableName, * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal * If the source directory is LOCAL - * @param isAcid true if this is an ACID operation + * @param isAcid + * true if this is an ACID operation + * @param hasFollowingStatsTask + true if there is a stats task following move task + * @param tbd + * description of table to be loaded */ public Partition loadPartition(Path loadPath, Table tbl, Map partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, - boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException { + boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, + TableDesc tbd, HiveOperation op) throws HiveException { Path tblDataLocationPath = tbl.getDataLocation(); try { /** @@ -1532,8 +1528,15 @@ public Partition loadPartition(Path loadPath, Table tbl, if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { newFiles = Collections.synchronizedList(new ArrayList()); } - FileSystem fs = tbl.getDataLocation().getFileSystem(conf); + if(op.equals(HiveOperation.LOAD) && + tbl.getInputFormatClass().equals(org.apache.hadoop.mapred.TextInputFormat.class)) { + int headerCount = Utilities.getHeaderCount(tbd); + int footerCount = Utilities.getFooterCount(tbd, new JobConf(conf)); + if (headerCount > 0 || footerCount > 0) { + Hive.copyFileWithHeaderFooter(isSrcLocal, headerCount,footerCount,conf,fs,loadPath,newPartPath); + } + } Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); } Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath); @@ -1691,8 +1694,9 @@ private void constructOneLBLocationMap(FileStatus fSta, * @throws HiveException */ public Map, Partition> loadDynamicPartitions(Path loadPath, - String tableName, Map partSpec, boolean replace, - int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask) + TableDesc tblDesc, Map partSpec, boolean replace, + int numDP, boolean listBucketingEnabled, boolean isAcid, long txnId, boolean hasFollowingStatsTask, + HiveOperation op) throws HiveException { Set validPartitions = new HashSet(); @@ -1720,7 +1724,7 @@ private void constructOneLBLocationMap(FileStatus fSta, + " to at least " + partsToLoad + '.'); } - Table tbl = getTable(tableName); + Table tbl = getTable(tblDesc.getTableName()); // for each dynamically created DP directory, construct a full partition spec // and load the partition based on that Iterator iter = validPartitions.iterator(); @@ -1742,7 +1746,7 @@ private void constructOneLBLocationMap(FileStatus fSta, LinkedHashMap fullPartSpec = new LinkedHashMap(partSpec); Warehouse.makeSpecFromName(fullPartSpec, partPath); Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace, - true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask); + true, listBucketingEnabled, false, isAcid, hasFollowingStatsTask, tblDesc, op); partitionsMap.put(fullPartSpec, newPartition); if (inPlaceEligible) { InPlaceUpdates.rePositionCursor(ps); @@ -2975,6 +2979,131 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, } } + static private void copyFileWithHeaderFooter(boolean isSrcLocal, int headerCount, int footerCount, HiveConf conf, FileSystem destFs, Path srcPath, Path destPath) + throws HiveException { + boolean inheritPerms = HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); + try { + // create the destination if it does not exist + if (!destFs.exists(destPath)) { + FileUtils.mkdir(destFs, destPath, inheritPerms, conf); + } + } catch (IOException e) { + throw new HiveException( + "copyFiles: error while checking/creating destination directory!!!", + e); + } + + FileStatus[] srcs; + FileSystem srcFs; + try { + srcFs = srcPath.getFileSystem(conf); + srcs = srcFs.globStatus(srcPath); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException("addFiles: filesystem error in check phase. " + e.getMessage(), e); + } + byte[] bytes = new byte[BUFFER_SIZE]; + int readByte; + try { + + FSDataInputStream in = srcFs.open(srcPath); + FSDataOutputStream out = destFs.create(new Path(destPath.toUri().getScheme(), + destPath.toUri().getAuthority(), + destPath.toUri().getPath() + srcPath.getName())); + int startPosition; + int endPosition = 0; + if (footerCount == 0) { + startPosition = skipHeader(in,headerCount); + }else { + int [] stEnd = skipHeaderAndFooter(in,headerCount,footerCount); + startPosition = stEnd[0]; + endPosition = stEnd[1]; + } + if(startPosition != 0){ + startPosition++; + } + in.seek(startPosition); + if (endPosition == 0){ + while ((readByte = in.read(bytes)) != -1){ + out.write(bytes, 0, readByte); + } + }else { + endPosition -= startPosition; + while ((readByte = in.read(bytes)) != -1) { + endPosition = (endPosition - readByte) <= 0 ? endPosition : (endPosition - readByte); + out.write(bytes, 0, endPosition < readByte ? endPosition : readByte); + } + } + out.flush(); + IOUtils.closeQuietly(in); + IOUtils.closeQuietly(out); + if(!isSrcLocal) + srcFs.delete(srcPath,true); + } catch (IOException e){ + LOG.info("Have problem with copy file to fs" + e.getMessage()); + throw new HiveException("Have problem with copy file to fs from " + srcPath + " to " + destPath + " with exception " + e.getMessage()); + } + } + + private static int skipHeader(FSDataInputStream in, int headerCount) throws IOException{ + int startPosition = 0; + int buff = 0; + int readByte; + byte[] bytes = new byte[BUFFER_SIZE]; + Stack stack = new Stack(); + readByte = in.read(bytes); + do{ + for (int i = 0; i < bytes.length; i++){ + if(bytes[i] == 0x0A){ + stack.push(i+buff); + if(--headerCount == 0){ + startPosition += i; + return startPosition; + } + } + } + if(headerCount > 0) { + startPosition += readByte; + } + buff += readByte; + }while ((readByte = in.read(bytes)) != -1); + throw new IOException("Header is more then file contains line at all. "); + } + + private static int[] skipHeaderAndFooter(FSDataInputStream in, int headerCount, int footerCount) + throws IOException{ + int[] result = new int[2]; + int startPosition = 0; + int endPosition = 0; + int buff = 0; + int readByte; + byte[] bytes = new byte[BUFFER_SIZE]; + Stack stack = new Stack(); + readByte = in.read(bytes); + do{ + for (int i = 0; i < bytes.length; i++){ + if(bytes[i] == 0x0A){ + stack.push(i+buff); + if(--headerCount == 0){ + startPosition += i; + } + } + } + if(headerCount > 0) { + startPosition += readByte; + } + buff += readByte; + }while ((readByte = in.read(bytes)) != -1); + for(int i = 0; i <= footerCount; i++){ + endPosition = stack.pop(); + } + result[0] = startPosition; + result[1] = endPosition; + return result; + } + + private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List newFiles) throws HiveException { // The layout for ACID files is table|partname/base|delta/bucket diff --git a/ql/src/test/queries/clientnegative/file_with_header_footer_negative.q b/ql/src/test/queries/clientnegative/file_with_header_footer_negative.q deleted file mode 100644 index 1f63d94..0000000 --- a/ql/src/test/queries/clientnegative/file_with_header_footer_negative.q +++ /dev/null @@ -1,13 +0,0 @@ -dfs ${system:test.dfs.mkdir} hdfs:///tmp/test_file_with_header_footer_negative/; - -dfs -copyFromLocal ../../data/files/header_footer_table_1 hdfs:///tmp/test_file_with_header_footer_negative/header_footer_table_1; - -dfs -copyFromLocal ../../data/files/header_footer_table_2 hdfs:///tmp/test_file_with_header_footer_negative/header_footer_table_2; - -CREATE EXTERNAL TABLE header_footer_table_1 (name string, message string, id int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs:///tmp/test_file_with_header_footer_negative/header_footer_table_1' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="200"); - -SELECT * FROM header_footer_table_1; - -DROP TABLE header_footer_table_1; - -dfs -rmr hdfs:///tmp/test_file_with_header_footer_negative; diff --git a/ql/src/test/queries/clientpositive/file_with_header_footer.q b/ql/src/test/queries/clientpositive/file_with_header_footer.q deleted file mode 100644 index 8913e54..0000000 --- a/ql/src/test/queries/clientpositive/file_with_header_footer.q +++ /dev/null @@ -1,40 +0,0 @@ -set hive.mapred.mode=nonstrict; -dfs ${system:test.dfs.mkdir} hdfs:///tmp/test/; - -dfs -copyFromLocal ../../data/files/header_footer_table_1 hdfs:///tmp/test/header_footer_table_1; - -dfs -copyFromLocal ../../data/files/header_footer_table_2 hdfs:///tmp/test/header_footer_table_2; - -dfs -copyFromLocal ../../data/files/header_footer_table_3 hdfs:///tmp/test/header_footer_table_3; - -CREATE EXTERNAL TABLE header_footer_table_1 (name string, message string, id int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs:///tmp/test/header_footer_table_1' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2"); - -SELECT * FROM header_footer_table_1; - -SELECT * FROM header_footer_table_1 WHERE id < 50; - -CREATE EXTERNAL TABLE header_footer_table_2 (name string, message string, id int) PARTITIONED BY (year int, month int, day int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2"); - -ALTER TABLE header_footer_table_2 ADD PARTITION (year=2012, month=1, day=1) location 'hdfs:///tmp/test/header_footer_table_2/2012/01/01'; - -ALTER TABLE header_footer_table_2 ADD PARTITION (year=2012, month=1, day=2) location 'hdfs:///tmp/test/header_footer_table_2/2012/01/02'; - -ALTER TABLE header_footer_table_2 ADD PARTITION (year=2012, month=1, day=3) location 'hdfs:///tmp/test/header_footer_table_2/2012/01/03'; - -SELECT * FROM header_footer_table_2; - -SELECT * FROM header_footer_table_2 WHERE id < 50; - -CREATE EXTERNAL TABLE emptytable (name string, message string, id int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs:///tmp/test/header_footer_table_3' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2"); - -SELECT * FROM emptytable; - -SELECT * FROM emptytable WHERE id < 50; - -DROP TABLE header_footer_table_1; - -DROP TABLE header_footer_table_2; - -DROP TABLE emptytable; - -dfs -rmr hdfs:///tmp/test; \ No newline at end of file diff --git a/ql/src/test/results/clientnegative/file_with_header_footer_negative.q.out b/ql/src/test/results/clientnegative/file_with_header_footer_negative.q.out deleted file mode 100644 index 1794ae6..0000000 --- a/ql/src/test/results/clientnegative/file_with_header_footer_negative.q.out +++ /dev/null @@ -1,19 +0,0 @@ -#### A masked pattern was here #### -PREHOOK: type: CREATETABLE -#### A masked pattern was here #### -PREHOOK: Output: database:default -PREHOOK: Output: default@header_footer_table_1 -#### A masked pattern was here #### -POSTHOOK: type: CREATETABLE -#### A masked pattern was here #### -POSTHOOK: Output: database:default -POSTHOOK: Output: default@header_footer_table_1 -PREHOOK: query: SELECT * FROM header_footer_table_1 -PREHOOK: type: QUERY -PREHOOK: Input: default@header_footer_table_1 -#### A masked pattern was here #### -POSTHOOK: query: SELECT * FROM header_footer_table_1 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@header_footer_table_1 -#### A masked pattern was here #### -Failed with exception java.io.IOException:java.io.IOException: footer number exceeds the limit defined in hive.file.max.footer diff --git a/ql/src/test/results/clientpositive/file_with_header_footer.q.out b/ql/src/test/results/clientpositive/file_with_header_footer.q.out deleted file mode 100644 index ca3dadb..0000000 --- a/ql/src/test/results/clientpositive/file_with_header_footer.q.out +++ /dev/null @@ -1,184 +0,0 @@ -#### A masked pattern was here #### -PREHOOK: type: CREATETABLE -#### A masked pattern was here #### -PREHOOK: Output: database:default -PREHOOK: Output: default@header_footer_table_1 -#### A masked pattern was here #### -POSTHOOK: type: CREATETABLE -#### A masked pattern was here #### -POSTHOOK: Output: database:default -POSTHOOK: Output: default@header_footer_table_1 -PREHOOK: query: SELECT * FROM header_footer_table_1 -PREHOOK: type: QUERY -PREHOOK: Input: default@header_footer_table_1 -#### A masked pattern was here #### -POSTHOOK: query: SELECT * FROM header_footer_table_1 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@header_footer_table_1 -#### A masked pattern was here #### -steven hive 1 -dave oozie 2 -xifa phd 3 -chuan hadoop 4 -shanyu senior 5 -steven2 hive 11 -dave2 oozie 12 -xifa2 phd 13 -chuan2 hadoop 14 -shanyu2 senior 15 -david3 oozie 22 -PREHOOK: query: SELECT * FROM header_footer_table_1 WHERE id < 50 -PREHOOK: type: QUERY -PREHOOK: Input: default@header_footer_table_1 -#### A masked pattern was here #### -POSTHOOK: query: SELECT * FROM header_footer_table_1 WHERE id < 50 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@header_footer_table_1 -#### A masked pattern was here #### -steven hive 1 -dave oozie 2 -xifa phd 3 -chuan hadoop 4 -shanyu senior 5 -steven2 hive 11 -dave2 oozie 12 -xifa2 phd 13 -chuan2 hadoop 14 -shanyu2 senior 15 -david3 oozie 22 -PREHOOK: query: CREATE EXTERNAL TABLE header_footer_table_2 (name string, message string, id int) PARTITIONED BY (year int, month int, day int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2") -PREHOOK: type: CREATETABLE -PREHOOK: Output: database:default -PREHOOK: Output: default@header_footer_table_2 -POSTHOOK: query: CREATE EXTERNAL TABLE header_footer_table_2 (name string, message string, id int) PARTITIONED BY (year int, month int, day int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' tblproperties ("skip.header.line.count"="1", "skip.footer.line.count"="2") -POSTHOOK: type: CREATETABLE -POSTHOOK: Output: database:default -POSTHOOK: Output: default@header_footer_table_2 -#### A masked pattern was here #### -PREHOOK: type: ALTERTABLE_ADDPARTS -#### A masked pattern was here #### -PREHOOK: Output: default@header_footer_table_2 -#### A masked pattern was here #### -POSTHOOK: type: ALTERTABLE_ADDPARTS -#### A masked pattern was here #### -POSTHOOK: Output: default@header_footer_table_2 -POSTHOOK: Output: default@header_footer_table_2@year=2012/month=1/day=1 -#### A masked pattern was here #### -PREHOOK: type: ALTERTABLE_ADDPARTS -#### A masked pattern was here #### -PREHOOK: Output: default@header_footer_table_2 -#### A masked pattern was here #### -POSTHOOK: type: ALTERTABLE_ADDPARTS -#### A masked pattern was here #### -POSTHOOK: Output: default@header_footer_table_2 -POSTHOOK: Output: default@header_footer_table_2@year=2012/month=1/day=2 -#### A masked pattern was here #### -PREHOOK: type: ALTERTABLE_ADDPARTS -#### A masked pattern was here #### -PREHOOK: Output: default@header_footer_table_2 -#### A masked pattern was here #### -POSTHOOK: type: ALTERTABLE_ADDPARTS -#### A masked pattern was here #### -POSTHOOK: Output: default@header_footer_table_2 -POSTHOOK: Output: default@header_footer_table_2@year=2012/month=1/day=3 -PREHOOK: query: SELECT * FROM header_footer_table_2 -PREHOOK: type: QUERY -PREHOOK: Input: default@header_footer_table_2 -PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=1 -PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=2 -PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=3 -#### A masked pattern was here #### -POSTHOOK: query: SELECT * FROM header_footer_table_2 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@header_footer_table_2 -POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=1 -POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=2 -POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=3 -#### A masked pattern was here #### -steven hive 1 2012 1 1 -dave oozie 2 2012 1 1 -xifa phd 3 2012 1 1 -chuan hadoop 4 2012 1 1 -shanyu senior 5 2012 1 1 -steven2 hive 11 2012 1 2 -dave2 oozie 12 2012 1 2 -xifa2 phd 13 2012 1 2 -chuan2 hadoop 14 2012 1 2 -shanyu2 senior 15 2012 1 2 -david3 oozie 22 2012 1 3 -PREHOOK: query: SELECT * FROM header_footer_table_2 WHERE id < 50 -PREHOOK: type: QUERY -PREHOOK: Input: default@header_footer_table_2 -PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=1 -PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=2 -PREHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=3 -#### A masked pattern was here #### -POSTHOOK: query: SELECT * FROM header_footer_table_2 WHERE id < 50 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@header_footer_table_2 -POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=1 -POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=2 -POSTHOOK: Input: default@header_footer_table_2@year=2012/month=1/day=3 -#### A masked pattern was here #### -steven hive 1 2012 1 1 -dave oozie 2 2012 1 1 -xifa phd 3 2012 1 1 -chuan hadoop 4 2012 1 1 -shanyu senior 5 2012 1 1 -steven2 hive 11 2012 1 2 -dave2 oozie 12 2012 1 2 -xifa2 phd 13 2012 1 2 -chuan2 hadoop 14 2012 1 2 -shanyu2 senior 15 2012 1 2 -david3 oozie 22 2012 1 3 -#### A masked pattern was here #### -PREHOOK: type: CREATETABLE -#### A masked pattern was here #### -PREHOOK: Output: database:default -PREHOOK: Output: default@emptytable -#### A masked pattern was here #### -POSTHOOK: type: CREATETABLE -#### A masked pattern was here #### -POSTHOOK: Output: database:default -POSTHOOK: Output: default@emptytable -PREHOOK: query: SELECT * FROM emptytable -PREHOOK: type: QUERY -PREHOOK: Input: default@emptytable -#### A masked pattern was here #### -POSTHOOK: query: SELECT * FROM emptytable -POSTHOOK: type: QUERY -POSTHOOK: Input: default@emptytable -#### A masked pattern was here #### -PREHOOK: query: SELECT * FROM emptytable WHERE id < 50 -PREHOOK: type: QUERY -PREHOOK: Input: default@emptytable -#### A masked pattern was here #### -POSTHOOK: query: SELECT * FROM emptytable WHERE id < 50 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@emptytable -#### A masked pattern was here #### -PREHOOK: query: DROP TABLE header_footer_table_1 -PREHOOK: type: DROPTABLE -PREHOOK: Input: default@header_footer_table_1 -PREHOOK: Output: default@header_footer_table_1 -POSTHOOK: query: DROP TABLE header_footer_table_1 -POSTHOOK: type: DROPTABLE -POSTHOOK: Input: default@header_footer_table_1 -POSTHOOK: Output: default@header_footer_table_1 -PREHOOK: query: DROP TABLE header_footer_table_2 -PREHOOK: type: DROPTABLE -PREHOOK: Input: default@header_footer_table_2 -PREHOOK: Output: default@header_footer_table_2 -POSTHOOK: query: DROP TABLE header_footer_table_2 -POSTHOOK: type: DROPTABLE -POSTHOOK: Input: default@header_footer_table_2 -POSTHOOK: Output: default@header_footer_table_2 -PREHOOK: query: DROP TABLE emptytable -PREHOOK: type: DROPTABLE -PREHOOK: Input: default@emptytable -PREHOOK: Output: default@emptytable -POSTHOOK: query: DROP TABLE emptytable -POSTHOOK: type: DROPTABLE -POSTHOOK: Input: default@emptytable -POSTHOOK: Output: default@emptytable -#### A masked pattern was here #### diff --git a/ql/src/test/results/clientpositive/skiphf_aggr.q.out b/ql/src/test/results/clientpositive/skiphf_aggr.q.out index aeb4b1b..f28192d 100644 --- a/ql/src/test/results/clientpositive/skiphf_aggr.q.out +++ b/ql/src/test/results/clientpositive/skiphf_aggr.q.out @@ -46,9 +46,11 @@ POSTHOOK: Input: default@skiphtbl POSTHOOK: Input: default@skiphtbl@b=1 POSTHOOK: Input: default@skiphtbl@b=2 #### A masked pattern was here #### +1 1 2 1 3 1 4 1 +1 2 2 2 3 2 4 2 @@ -91,6 +93,7 @@ POSTHOOK: Input: default@skiphtbl POSTHOOK: Input: default@skiphtbl@b=1 POSTHOOK: Input: default@skiphtbl@b=2 #### A masked pattern was here #### +1 2 3 4 @@ -124,6 +127,7 @@ POSTHOOK: Input: default@skiphtbl POSTHOOK: Input: default@skiphtbl@b=1 POSTHOOK: Input: default@skiphtbl@b=2 #### A masked pattern was here #### +1 2 PREHOOK: query: SELECT MIN(b) FROM skipHTbl PREHOOK: type: QUERY @@ -137,7 +141,7 @@ POSTHOOK: Input: default@skiphtbl POSTHOOK: Input: default@skiphtbl@b=1 POSTHOOK: Input: default@skiphtbl@b=2 #### A masked pattern was here #### -2 +1 PREHOOK: query: SELECT DISTINCT a FROM skipHTbl PREHOOK: type: QUERY PREHOOK: Input: default@skiphtbl @@ -150,6 +154,7 @@ POSTHOOK: Input: default@skiphtbl POSTHOOK: Input: default@skiphtbl@b=1 POSTHOOK: Input: default@skiphtbl@b=2 #### A masked pattern was here #### +1 2 3 4 @@ -204,9 +209,11 @@ POSTHOOK: Input: default@skipftbl@b=2 1 1 2 1 3 1 +4 1 1 2 2 2 3 2 +4 2 PREHOOK: query: SELECT DISTINCT b FROM skipFTbl PREHOOK: type: QUERY PREHOOK: Input: default@skipftbl @@ -249,6 +256,7 @@ POSTHOOK: Input: default@skipftbl@b=2 1 2 3 +4 PREHOOK: query: DROP TABLE skipHTbl PREHOOK: type: DROPTABLE PREHOOK: Input: default@skiphtbl