diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java new file mode 100644 index 0000000..ac73c6d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.PTFDeserializer; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileRecordReader; +import org.apache.hadoop.util.Progressable; + + +/** + * Extends the RowContainer functionality to provide random access getAt(i). + * It extends RowContainer behavior in the following ways: + *
    + *
  1. You must continue to call first to signal the transition from writing to the + * Container to reading from it. + *
  2. As rows are being added, positions at which a spill occurs is captured as a + * BlockInfo object. At this point it captures the offset in the File at which the current + * Block will be written. + *
  3. When first is called: we associate with each BlockInfo the File Split that it + * occurs in. + *
  4. So in order to read a random row from the Container we do the following: + * + *
  5. We track the span of the currentReadBlock, using + * currentReadBlockStartRow,blockSize. So if a row is requested in this span, + * we don't need to read rows from disk. + *
  6. If the requested row is in the 'last' block; we point the currentReadBlock to + * the currentWriteBlock; the same as what RowContainer does. + *
  7. the getAt leaves the Container in the same state as a + * next call; so a getAt and next calls can be interspersed. + *
+ */ +public class PTFRowContainer> extends RowContainer { + + ArrayList blockInfos; + int currentReadBlockStartRow; + + public PTFRowContainer(int bs, Configuration jc, Reporter reporter + ) throws HiveException { + super(bs, jc, reporter); + blockInfos = new ArrayList(); + } + + @Override + public void add(Row t) throws HiveException { + if ( willSpill() ) { + setupWriter(); + PTFRecordWriter rw = (PTFRecordWriter) getRecordWriter(); + BlockInfo blkInfo = new BlockInfo(); + try { + blkInfo.startOffset = rw.outStream.getLength(); + blockInfos.add(blkInfo); + } catch(IOException e) { + clear(); + LOG.error(e.toString(), e); + throw new HiveException(e); + } + } + super.add(t); + } + + @Override + public Row first() throws HiveException { + Row r = super.first(); + + InputSplit[] inputSplits = getInputSplits(); + FileSplit fS = null; + BlockInfo bI = blockInfos.get(0); + bI.startingSplit = 0; + int i = 1; + bI = blockInfos.get(i); + for(int j=1; j < inputSplits.length && i < blockInfos.size(); j++) { + fS = (FileSplit) inputSplits[j]; + while (bI.startOffset < fS.getStart() ) { + bI.startingSplit = j - 1; + i += 1; + if ( i < blockInfos.size() ) { + bI = blockInfos.get(i); + } + } + } + + while ( i < blockInfos.size() ) { + bI = blockInfos.get(i); + bI.startingSplit = inputSplits.length - 1; + i++; + } + + currentReadBlockStartRow = 0; + + return r; + } + + @Override + public Row next() throws HiveException { + boolean endOfCurrBlock = endOfCurrentReadBlock(); + if ( endOfCurrBlock ) { + currentReadBlockStartRow += getCurrentReadBlockSize(); + } + return super.next(); + } + + public Row getAt(int rowIdx) throws HiveException { + int blockSize = getBlockSize(); + if ( rowIdx < currentReadBlockStartRow || rowIdx >= currentReadBlockStartRow + blockSize ) { + readBlock(getBlockNum(rowIdx)); + } + return getReadBlockRow(rowIdx - currentReadBlockStartRow); + } + + private int numBlocks() { + return blockInfos.size() + 1; + } + + private int getBlockNum(int rowIdx) { + int blockSize = getBlockSize(); + return rowIdx / blockSize; + } + + private void readBlock(int blockNum) throws HiveException { + currentReadBlockStartRow = getBlockSize() * blockNum; + + if ( blockNum == numBlocks() - 1 ) { + setWriteBlockAsReadBlock(); + return; + } + + resetCurrentReadBlockToFirstReadBlock(); + + BlockInfo bI = blockInfos.get(blockNum); + int startSplit = bI.startingSplit; + int endSplit = startSplit; + if ( blockNum != blockInfos.size() - 1) { + endSplit = blockInfos.get(blockNum+1).startingSplit; + } + + try { + int readIntoOffset = 0; + for(int i = startSplit; i <= endSplit; i++ ) { + org.apache.hadoop.mapred.RecordReader rr = setReaderAtSplit(i); + if ( i == startSplit ) { + ((PTFSequenceFileRecordReader)rr).seek(bI.startOffset); + } + nextBlock(readIntoOffset); + readIntoOffset = getCurrentReadBlockSize(); + } + + } catch(Exception e) { + clear(); + LOG.error(e.toString(), e); + if ( e instanceof HiveException ) { + throw (HiveException) e; + } + throw new HiveException(e); + } + } + + private static class BlockInfo { + // position in file where the first row in this block starts + long startOffset; + + // inputSplitNum that contains the first row in this block. + int startingSplit; + } + + public static TableDesc createTableDesc(StructObjectInspector oI) { + Map props = new HashMap(); + PTFDeserializer.addOIPropertiestoSerDePropsMap(oI, props); + String colNames = props.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS); + String colTypes = props.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES); + TableDesc tblDesc = new TableDesc(LazyBinarySerDe.class, + PTFSequenceFileInputFormat.class, PTFHiveSequenceFileOutputFormat.class, + Utilities.makeProperties( + org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, "" + + Utilities.ctrlaCode, + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, colNames + .toString(), + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, + colTypes.toString())); + return tblDesc; + } + + + private static class PTFRecordWriter implements RecordWriter { + BytesWritable EMPTY_KEY = new BytesWritable(); + + SequenceFile.Writer outStream; + + public PTFRecordWriter(SequenceFile.Writer outStream) { + this.outStream = outStream; + } + + public void write(Writable r) throws IOException { + outStream.append(EMPTY_KEY, r); + } + + public void close(boolean abort) throws IOException { + outStream.close(); + } + } + + public static class PTFHiveSequenceFileOutputFormat extends HiveSequenceFileOutputFormat { + + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, + Properties tableProperties, Progressable progress) throws IOException { + + FileSystem fs = finalOutPath.getFileSystem(jc); + final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc, + fs, finalOutPath, BytesWritable.class, valueClass, isCompressed); + + return new PTFRecordWriter(outStream); + } + + } + + public static class PTFSequenceFileInputFormat extends SequenceFileInputFormat { + + public PTFSequenceFileInputFormat() { + super(); + } + + @Override + public org.apache.hadoop.mapred.RecordReader getRecordReader(InputSplit split, + JobConf job, Reporter reporter) + throws IOException { + reporter.setStatus(split.toString()); + return new PTFSequenceFileRecordReader(job, (FileSplit) split); + } + } + + public static class PTFSequenceFileRecordReader extends SequenceFileRecordReader { + + public PTFSequenceFileRecordReader(Configuration conf, FileSplit split) + throws IOException { + super(conf, split); + + } + @Override + public void seek(long pos) throws IOException { + super.seek(pos); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java index 1c7ab7a..b6352fd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java @@ -30,8 +30,8 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -142,7 +142,7 @@ public RowContainer(int bs, Configuration jc, Reporter reporter this.reporter = reporter; } } - + private JobConf getLocalFSJobConfClone(Configuration jc) { if (this.jobCloneUsingLocalFs == null) { this.jobCloneUsingLocalFs = new JobConf(jc); @@ -160,7 +160,7 @@ public void setSerDe(SerDe sd, ObjectInspector oi) { @Override public void add(Row t) throws HiveException { if (this.tblDesc != null) { - if (addCursor >= blockSize) { // spill the current block to tmp file + if (willSpill()) { // spill the current block to tmp file spillBlock(currentWriteBlock, addCursor); addCursor = 0; if (numFlushedBlocks == 1) { @@ -221,7 +221,7 @@ public Row first() throws HiveException { localJc, reporter); currentSplitPointer++; - nextBlock(); + nextBlock(0); } // we are guaranteed that we can get data here (since 'size' is not zero) Row ret = currentReadBlock[itrCursor++]; @@ -258,13 +258,10 @@ public Row next() throws HiveException { removeKeys(ret); return ret; } else { - nextBlock(); + nextBlock(0); if (this.readBlockSize == 0) { if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) { - this.itrCursor = 0; - this.readBlockSize = this.addCursor; - this.firstReadBlockPointer = this.currentReadBlock; - currentReadBlock = currentWriteBlock; + setWriteBlockAsReadBlock(); } else { return null; } @@ -288,36 +285,7 @@ private void removeKeys(Row ret) { private void spillBlock(Row[] block, int length) throws HiveException { try { if (tmpFile == null) { - - String suffix = ".tmp"; - if (this.keyObject != null) { - suffix = "." + this.keyObject.toString() + suffix; - } - - while (true) { - parentFile = File.createTempFile("hive-rowcontainer", ""); - boolean success = parentFile.delete() && parentFile.mkdir(); - if (success) { - break; - } - LOG.debug("retry creating tmp row-container directory..."); - } - - tmpFile = File.createTempFile("RowContainer", suffix, parentFile); - LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath()); - // Delete the temp file if the JVM terminate normally through Hadoop job - // kill command. - // Caveat: it won't be deleted if JVM is killed by 'kill -9'. - parentFile.deleteOnExit(); - tmpFile.deleteOnExit(); - - // rFile = new RandomAccessFile(tmpFile, "rw"); - HiveOutputFormat hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance(); - tempOutPath = new Path(tmpFile.toString()); - JobConf localJc = getLocalFSJobConfClone(jc); - rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, - hiveOutputFormat, serde.getSerializedClass(), false, - tblDesc.getProperties(), tempOutPath, reporter); + setupWriter(); } else if (rw == null) { throw new HiveException("RowContainer has already been closed for writing."); } @@ -350,6 +318,9 @@ private void spillBlock(Row[] block, int length) throws HiveException { } catch (Exception e) { clear(); LOG.error(e.toString(), e); + if ( e instanceof HiveException ) { + throw (HiveException) e; + } throw new HiveException(e); } } @@ -364,7 +335,7 @@ public long size() { return size; } - private boolean nextBlock() throws HiveException { + protected boolean nextBlock(int readIntoOffset) throws HiveException { itrCursor = 0; this.readBlockSize = 0; if (this.numFlushedBlocks == 0) { @@ -376,7 +347,7 @@ private boolean nextBlock() throws HiveException { val = serde.getSerializedClass().newInstance(); } boolean nextSplit = true; - int i = 0; + int i = readIntoOffset; if (rr != null) { Object key = rr.createKey(); @@ -393,7 +364,7 @@ private boolean nextBlock() throws HiveException { rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs, reporter); currentSplitPointer++; - return nextBlock(); + return nextBlock(0); } this.readBlockSize = i; @@ -504,4 +475,108 @@ public void setTableDesc(TableDesc tblDesc) { this.tblDesc = tblDesc; } + protected int getAddCursor() { + return addCursor; + } + + protected final boolean willSpill() { + return addCursor >= blockSize; + } + + protected int getBlockSize() { + return blockSize; + } + + protected void setupWriter() throws HiveException { + try { + + if ( tmpFile != null ) { + return; + } + + String suffix = ".tmp"; + if (this.keyObject != null) { + suffix = "." + this.keyObject.toString() + suffix; + } + + while (true) { + parentFile = File.createTempFile("hive-rowcontainer", ""); + boolean success = parentFile.delete() && parentFile.mkdir(); + if (success) { + break; + } + LOG.debug("retry creating tmp row-container directory..."); + } + + tmpFile = File.createTempFile("RowContainer", suffix, parentFile); + LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath()); + // Delete the temp file if the JVM terminate normally through Hadoop job + // kill command. + // Caveat: it won't be deleted if JVM is killed by 'kill -9'. + parentFile.deleteOnExit(); + tmpFile.deleteOnExit(); + + // rFile = new RandomAccessFile(tmpFile, "rw"); + HiveOutputFormat hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance(); + tempOutPath = new Path(tmpFile.toString()); + JobConf localJc = getLocalFSJobConfClone(jc); + rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, + hiveOutputFormat, serde.getSerializedClass(), false, + tblDesc.getProperties(), tempOutPath, reporter); + } catch (Exception e) { + clear(); + LOG.error(e.toString(), e); + throw new HiveException(e); + } + + } + + protected RecordWriter getRecordWriter() { + return rw; + } + + protected InputSplit[] getInputSplits() { + return inputSplits; + } + + protected boolean endOfCurrentReadBlock() { + if (tblDesc == null) { + return false; + } + return itrCursor >= this.readBlockSize; + } + + protected int getCurrentReadBlockSize() { + return readBlockSize; + } + + protected void setWriteBlockAsReadBlock() { + this.itrCursor = 0; + this.readBlockSize = this.addCursor; + this.firstReadBlockPointer = this.currentReadBlock; + currentReadBlock = currentWriteBlock; + } + + protected org.apache.hadoop.mapred.RecordReader setReaderAtSplit(int splitNum) throws IOException { + JobConf localJc = getLocalFSJobConfClone(jc); + currentSplitPointer = splitNum; + if ( rr != null ) { + rr.close(); + } + // open record reader to read next split + rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs, + reporter); + currentSplitPointer++; + return rr; + } + + protected Row getReadBlockRow(int rowOffset) { + itrCursor = rowOffset + 1; + return currentReadBlock[rowOffset]; + } + + protected void resetCurrentReadBlockToFirstReadBlock() { + currentReadBlock = firstReadBlockPointer; + } + }