diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
new file mode 100644
index 0000000..ac73c6d
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.Progressable;
+
+
+/**
+ * Extends the RowContainer functionality to provide random access getAt(i).
+ * It extends RowContainer behavior in the following ways:
+ *
+ * - You must continue to call first to signal the transition from writing to the
+ * Container to reading from it.
+ *
- As rows are being added, positions at which a spill occurs is captured as a
+ * BlockInfo object. At this point it captures the offset in the File at which the current
+ * Block will be written.
+ *
- When first is called: we associate with each BlockInfo the File Split that it
+ * occurs in.
+ *
- So in order to read a random row from the Container we do the following:
+ *
+ * - Convert the row index into a block number. This is easy because all blocks are
+ * the same size, given by the
blockSize
+ * - The corresponding BlockInfo tells us the Split that this block starts in. Also
+ * by looking at the next Block in the BlockInfos list, we know which Split this block ends in.
+ *
- So we arrange to read all the Splits that contain rows for this block. For the first
+ * Split we seek to the startOffset that we captured in BlockInfo.
+ *
- So after reading the Splits, all rows in this block are in the 'currentReadBlock'
+ *
+ * - We track the span of the currentReadBlock, using
+ *
currentReadBlockStartRow,blockSize. So if a row is requested in this span,
+ * we don't need to read rows from disk.
+ * - If the requested row is in the 'last' block; we point the currentReadBlock to
+ * the currentWriteBlock; the same as what RowContainer does.
+ *
- the
getAt leaves the Container in the same state as a
+ * next call; so a getAt and next calls can be interspersed.
+ *
+ */
+public class PTFRowContainer> extends RowContainer {
+
+ ArrayList blockInfos;
+ int currentReadBlockStartRow;
+
+ public PTFRowContainer(int bs, Configuration jc, Reporter reporter
+ ) throws HiveException {
+ super(bs, jc, reporter);
+ blockInfos = new ArrayList();
+ }
+
+ @Override
+ public void add(Row t) throws HiveException {
+ if ( willSpill() ) {
+ setupWriter();
+ PTFRecordWriter rw = (PTFRecordWriter) getRecordWriter();
+ BlockInfo blkInfo = new BlockInfo();
+ try {
+ blkInfo.startOffset = rw.outStream.getLength();
+ blockInfos.add(blkInfo);
+ } catch(IOException e) {
+ clear();
+ LOG.error(e.toString(), e);
+ throw new HiveException(e);
+ }
+ }
+ super.add(t);
+ }
+
+ @Override
+ public Row first() throws HiveException {
+ Row r = super.first();
+
+ InputSplit[] inputSplits = getInputSplits();
+ FileSplit fS = null;
+ BlockInfo bI = blockInfos.get(0);
+ bI.startingSplit = 0;
+ int i = 1;
+ bI = blockInfos.get(i);
+ for(int j=1; j < inputSplits.length && i < blockInfos.size(); j++) {
+ fS = (FileSplit) inputSplits[j];
+ while (bI.startOffset < fS.getStart() ) {
+ bI.startingSplit = j - 1;
+ i += 1;
+ if ( i < blockInfos.size() ) {
+ bI = blockInfos.get(i);
+ }
+ }
+ }
+
+ while ( i < blockInfos.size() ) {
+ bI = blockInfos.get(i);
+ bI.startingSplit = inputSplits.length - 1;
+ i++;
+ }
+
+ currentReadBlockStartRow = 0;
+
+ return r;
+ }
+
+ @Override
+ public Row next() throws HiveException {
+ boolean endOfCurrBlock = endOfCurrentReadBlock();
+ if ( endOfCurrBlock ) {
+ currentReadBlockStartRow += getCurrentReadBlockSize();
+ }
+ return super.next();
+ }
+
+ public Row getAt(int rowIdx) throws HiveException {
+ int blockSize = getBlockSize();
+ if ( rowIdx < currentReadBlockStartRow || rowIdx >= currentReadBlockStartRow + blockSize ) {
+ readBlock(getBlockNum(rowIdx));
+ }
+ return getReadBlockRow(rowIdx - currentReadBlockStartRow);
+ }
+
+ private int numBlocks() {
+ return blockInfos.size() + 1;
+ }
+
+ private int getBlockNum(int rowIdx) {
+ int blockSize = getBlockSize();
+ return rowIdx / blockSize;
+ }
+
+ private void readBlock(int blockNum) throws HiveException {
+ currentReadBlockStartRow = getBlockSize() * blockNum;
+
+ if ( blockNum == numBlocks() - 1 ) {
+ setWriteBlockAsReadBlock();
+ return;
+ }
+
+ resetCurrentReadBlockToFirstReadBlock();
+
+ BlockInfo bI = blockInfos.get(blockNum);
+ int startSplit = bI.startingSplit;
+ int endSplit = startSplit;
+ if ( blockNum != blockInfos.size() - 1) {
+ endSplit = blockInfos.get(blockNum+1).startingSplit;
+ }
+
+ try {
+ int readIntoOffset = 0;
+ for(int i = startSplit; i <= endSplit; i++ ) {
+ org.apache.hadoop.mapred.RecordReader rr = setReaderAtSplit(i);
+ if ( i == startSplit ) {
+ ((PTFSequenceFileRecordReader)rr).seek(bI.startOffset);
+ }
+ nextBlock(readIntoOffset);
+ readIntoOffset = getCurrentReadBlockSize();
+ }
+
+ } catch(Exception e) {
+ clear();
+ LOG.error(e.toString(), e);
+ if ( e instanceof HiveException ) {
+ throw (HiveException) e;
+ }
+ throw new HiveException(e);
+ }
+ }
+
+ private static class BlockInfo {
+ // position in file where the first row in this block starts
+ long startOffset;
+
+ // inputSplitNum that contains the first row in this block.
+ int startingSplit;
+ }
+
+ public static TableDesc createTableDesc(StructObjectInspector oI) {
+ Map props = new HashMap();
+ PTFDeserializer.addOIPropertiestoSerDePropsMap(oI, props);
+ String colNames = props.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS);
+ String colTypes = props.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES);
+ TableDesc tblDesc = new TableDesc(LazyBinarySerDe.class,
+ PTFSequenceFileInputFormat.class, PTFHiveSequenceFileOutputFormat.class,
+ Utilities.makeProperties(
+ org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, ""
+ + Utilities.ctrlaCode,
+ org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, colNames
+ .toString(),
+ org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES,
+ colTypes.toString()));
+ return tblDesc;
+ }
+
+
+ private static class PTFRecordWriter implements RecordWriter {
+ BytesWritable EMPTY_KEY = new BytesWritable();
+
+ SequenceFile.Writer outStream;
+
+ public PTFRecordWriter(SequenceFile.Writer outStream) {
+ this.outStream = outStream;
+ }
+
+ public void write(Writable r) throws IOException {
+ outStream.append(EMPTY_KEY, r);
+ }
+
+ public void close(boolean abort) throws IOException {
+ outStream.close();
+ }
+ }
+
+ public static class PTFHiveSequenceFileOutputFormat extends HiveSequenceFileOutputFormat {
+
+ @Override
+ public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ Class extends Writable> valueClass, boolean isCompressed,
+ Properties tableProperties, Progressable progress) throws IOException {
+
+ FileSystem fs = finalOutPath.getFileSystem(jc);
+ final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
+ fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
+
+ return new PTFRecordWriter(outStream);
+ }
+
+ }
+
+ public static class PTFSequenceFileInputFormat extends SequenceFileInputFormat {
+
+ public PTFSequenceFileInputFormat() {
+ super();
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordReader getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter)
+ throws IOException {
+ reporter.setStatus(split.toString());
+ return new PTFSequenceFileRecordReader(job, (FileSplit) split);
+ }
+ }
+
+ public static class PTFSequenceFileRecordReader extends SequenceFileRecordReader {
+
+ public PTFSequenceFileRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+ super(conf, split);
+
+ }
+ @Override
+ public void seek(long pos) throws IOException {
+ super.seek(pos);
+ }
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
index 1c7ab7a..b6352fd 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
@@ -30,8 +30,8 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -142,7 +142,7 @@ public RowContainer(int bs, Configuration jc, Reporter reporter
this.reporter = reporter;
}
}
-
+
private JobConf getLocalFSJobConfClone(Configuration jc) {
if (this.jobCloneUsingLocalFs == null) {
this.jobCloneUsingLocalFs = new JobConf(jc);
@@ -160,7 +160,7 @@ public void setSerDe(SerDe sd, ObjectInspector oi) {
@Override
public void add(Row t) throws HiveException {
if (this.tblDesc != null) {
- if (addCursor >= blockSize) { // spill the current block to tmp file
+ if (willSpill()) { // spill the current block to tmp file
spillBlock(currentWriteBlock, addCursor);
addCursor = 0;
if (numFlushedBlocks == 1) {
@@ -221,7 +221,7 @@ public Row first() throws HiveException {
localJc, reporter);
currentSplitPointer++;
- nextBlock();
+ nextBlock(0);
}
// we are guaranteed that we can get data here (since 'size' is not zero)
Row ret = currentReadBlock[itrCursor++];
@@ -258,13 +258,10 @@ public Row next() throws HiveException {
removeKeys(ret);
return ret;
} else {
- nextBlock();
+ nextBlock(0);
if (this.readBlockSize == 0) {
if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) {
- this.itrCursor = 0;
- this.readBlockSize = this.addCursor;
- this.firstReadBlockPointer = this.currentReadBlock;
- currentReadBlock = currentWriteBlock;
+ setWriteBlockAsReadBlock();
} else {
return null;
}
@@ -288,36 +285,7 @@ private void removeKeys(Row ret) {
private void spillBlock(Row[] block, int length) throws HiveException {
try {
if (tmpFile == null) {
-
- String suffix = ".tmp";
- if (this.keyObject != null) {
- suffix = "." + this.keyObject.toString() + suffix;
- }
-
- while (true) {
- parentFile = File.createTempFile("hive-rowcontainer", "");
- boolean success = parentFile.delete() && parentFile.mkdir();
- if (success) {
- break;
- }
- LOG.debug("retry creating tmp row-container directory...");
- }
-
- tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
- LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
- // Delete the temp file if the JVM terminate normally through Hadoop job
- // kill command.
- // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
- parentFile.deleteOnExit();
- tmpFile.deleteOnExit();
-
- // rFile = new RandomAccessFile(tmpFile, "rw");
- HiveOutputFormat, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
- tempOutPath = new Path(tmpFile.toString());
- JobConf localJc = getLocalFSJobConfClone(jc);
- rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
- hiveOutputFormat, serde.getSerializedClass(), false,
- tblDesc.getProperties(), tempOutPath, reporter);
+ setupWriter();
} else if (rw == null) {
throw new HiveException("RowContainer has already been closed for writing.");
}
@@ -350,6 +318,9 @@ private void spillBlock(Row[] block, int length) throws HiveException {
} catch (Exception e) {
clear();
LOG.error(e.toString(), e);
+ if ( e instanceof HiveException ) {
+ throw (HiveException) e;
+ }
throw new HiveException(e);
}
}
@@ -364,7 +335,7 @@ public long size() {
return size;
}
- private boolean nextBlock() throws HiveException {
+ protected boolean nextBlock(int readIntoOffset) throws HiveException {
itrCursor = 0;
this.readBlockSize = 0;
if (this.numFlushedBlocks == 0) {
@@ -376,7 +347,7 @@ private boolean nextBlock() throws HiveException {
val = serde.getSerializedClass().newInstance();
}
boolean nextSplit = true;
- int i = 0;
+ int i = readIntoOffset;
if (rr != null) {
Object key = rr.createKey();
@@ -393,7 +364,7 @@ private boolean nextBlock() throws HiveException {
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
reporter);
currentSplitPointer++;
- return nextBlock();
+ return nextBlock(0);
}
this.readBlockSize = i;
@@ -504,4 +475,108 @@ public void setTableDesc(TableDesc tblDesc) {
this.tblDesc = tblDesc;
}
+ protected int getAddCursor() {
+ return addCursor;
+ }
+
+ protected final boolean willSpill() {
+ return addCursor >= blockSize;
+ }
+
+ protected int getBlockSize() {
+ return blockSize;
+ }
+
+ protected void setupWriter() throws HiveException {
+ try {
+
+ if ( tmpFile != null ) {
+ return;
+ }
+
+ String suffix = ".tmp";
+ if (this.keyObject != null) {
+ suffix = "." + this.keyObject.toString() + suffix;
+ }
+
+ while (true) {
+ parentFile = File.createTempFile("hive-rowcontainer", "");
+ boolean success = parentFile.delete() && parentFile.mkdir();
+ if (success) {
+ break;
+ }
+ LOG.debug("retry creating tmp row-container directory...");
+ }
+
+ tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
+ LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
+ // Delete the temp file if the JVM terminate normally through Hadoop job
+ // kill command.
+ // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
+ parentFile.deleteOnExit();
+ tmpFile.deleteOnExit();
+
+ // rFile = new RandomAccessFile(tmpFile, "rw");
+ HiveOutputFormat, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
+ tempOutPath = new Path(tmpFile.toString());
+ JobConf localJc = getLocalFSJobConfClone(jc);
+ rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
+ hiveOutputFormat, serde.getSerializedClass(), false,
+ tblDesc.getProperties(), tempOutPath, reporter);
+ } catch (Exception e) {
+ clear();
+ LOG.error(e.toString(), e);
+ throw new HiveException(e);
+ }
+
+ }
+
+ protected RecordWriter getRecordWriter() {
+ return rw;
+ }
+
+ protected InputSplit[] getInputSplits() {
+ return inputSplits;
+ }
+
+ protected boolean endOfCurrentReadBlock() {
+ if (tblDesc == null) {
+ return false;
+ }
+ return itrCursor >= this.readBlockSize;
+ }
+
+ protected int getCurrentReadBlockSize() {
+ return readBlockSize;
+ }
+
+ protected void setWriteBlockAsReadBlock() {
+ this.itrCursor = 0;
+ this.readBlockSize = this.addCursor;
+ this.firstReadBlockPointer = this.currentReadBlock;
+ currentReadBlock = currentWriteBlock;
+ }
+
+ protected org.apache.hadoop.mapred.RecordReader setReaderAtSplit(int splitNum) throws IOException {
+ JobConf localJc = getLocalFSJobConfClone(jc);
+ currentSplitPointer = splitNum;
+ if ( rr != null ) {
+ rr.close();
+ }
+ // open record reader to read next split
+ rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
+ reporter);
+ currentSplitPointer++;
+ return rr;
+ }
+
+ protected Row getReadBlockRow(int rowOffset) {
+ itrCursor = rowOffset + 1;
+ return currentReadBlock[rowOffset];
+ }
+
+ protected void resetCurrentReadBlockToFirstReadBlock() {
+ currentReadBlock = firstReadBlockPointer;
+ }
+
}