diff --git data/files/datatypes.txt data/files/datatypes.txt index 0228a27..458c5bd 100644 --- data/files/datatypes.txt +++ data/files/datatypes.txt @@ -1,3 +1,3 @@ -\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N --1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N +\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N +-1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N\N 1true1.11121x2ykva92.2111.01abcd1111213142212212x1abcd22012-04-22 09:00:00.123456789123456789.0123456YWJjZA==2013-01-01abc123abc123X'01FF' diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index f85a621..72b6b39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -215,7 +215,7 @@ public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) { String tableName = "result"; List lst = null; try { - lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer()); + lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf)); } catch (Exception e) { LOG.warn("Error getting schema: " + org.apache.hadoop.util.StringUtils.stringifyException(e)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 89fff81..de6f7a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -26,7 +26,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; +import com.google.common.collect.Iterators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,10 +36,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.FooterBuffer; import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveRecordReader; @@ -48,23 +48,19 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; @@ -73,71 +69,75 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; /** * FetchTask implementation. **/ public class FetchOperator implements Serializable { - static Log LOG = LogFactory.getLog(FetchOperator.class.getName()); - static LogHelper console = new LogHelper(LOG); + static final Log LOG = LogFactory.getLog(FetchOperator.class.getName()); + static final LogHelper console = new LogHelper(LOG); - private boolean isNativeTable; private FetchWork work; protected Operator operator; // operator tree for processing row further (option) - private int splitNum; - private PartitionDesc currPart; - private TableDesc currTbl; - private boolean tblDataDone; private FooterBuffer footerBuffer = null; private int headerCount = 0; private int footerCount = 0; - private boolean hasVC; - private boolean isPartitioned; + private final boolean hasVC; + private final boolean isStatReader; + private final boolean isPartitioned; + private final boolean isNonNativeTable; private StructObjectInspector vcsOI; private List vcCols; private ExecMapperContext context; + private transient Deserializer tableSerDe; + + private transient Iterator iterPath; + private transient Iterator iterPartDesc; + private transient Iterator iterSplits = Iterators.emptyIterator(); + + private transient Path currPath; + private transient PartitionDesc currDesc; + private transient Deserializer currSerDe; + private transient Converter partTblObjectInspectorConverter; private transient RecordReader currRecReader; - private transient FetchInputFormatSplit[] inputSplits; - private transient InputFormat inputFormat; + private transient JobConf job; private transient WritableComparable key; private transient Writable value; private transient Object[] vcValues; - private transient Deserializer serde; - private transient Deserializer tblSerde; - private transient Converter partTblObjectInspectorConverter; - private transient Iterator iterPath; - private transient Iterator iterPartDesc; - private transient Path currPath; - private transient StructObjectInspector objectInspector; - private transient StructObjectInspector rowObjectInspector; - private transient ObjectInspector partitionedTableOI; + private transient StructObjectInspector delegatedOI; + private transient ObjectInspector convertibleOI; + private transient ObjectInspector outputOI; private transient Object[] row; - public FetchOperator() { - } - - public FetchOperator(FetchWork work, JobConf job) { - this.job = job; - this.work = work; - initialize(); + public FetchOperator(FetchWork work, JobConf job) throws HiveException { + this(work, job, null, null); } public FetchOperator(FetchWork work, JobConf job, Operator operator, - List vcCols) { + List vcCols) throws HiveException { this.job = job; this.work = work; this.operator = operator; this.vcCols = vcCols; + this.hasVC = vcCols != null && !vcCols.isEmpty(); + this.isStatReader = work.getTblDesc() == null; + this.isPartitioned = !isStatReader && work.isPartitioned(); + this.isNonNativeTable = !isStatReader && work.getTblDesc().isNonNative(); initialize(); } - private void initialize() { - if (hasVC = vcCols != null && !vcCols.isEmpty()) { + private void initialize() throws HiveException { + if (isStatReader) { + outputOI = work.getStatRowOI(); + return; + } + if (hasVC) { List names = new ArrayList(vcCols.size()); List inspectors = new ArrayList(vcCols.size()); for (VirtualColumn vc : vcCols) { @@ -147,8 +147,6 @@ private void initialize() { vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); vcValues = new Object[vcCols.size()]; } - isPartitioned = work.isPartitioned(); - tblDataDone = false; if (hasVC && isPartitioned) { row = new Object[3]; } else if (hasVC || isPartitioned) { @@ -156,21 +154,26 @@ private void initialize() { } else { row = new Object[1]; } - if (work.getTblDesc() != null) { - isNativeTable = !work.getTblDesc().isNonNative(); + if (isPartitioned) { + iterPath = work.getPartDir().iterator(); + iterPartDesc = work.getPartDesc().iterator(); } else { - isNativeTable = true; + iterPath = Arrays.asList(work.getTblDir()).iterator(); + iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null)); } - setupExecContext(); + outputOI = setupOutputObjectInspector(); + context = setupExecContext(operator); } - private void setupExecContext() { + private ExecMapperContext setupExecContext(Operator operator) { + ExecMapperContext context = null; if (hasVC || work.getSplitSample() != null) { context = new ExecMapperContext(); if (operator != null) { operator.setExecContext(context); } } + return context; } public FetchWork getWork() { @@ -181,42 +184,6 @@ public void setWork(FetchWork work) { this.work = work; } - public int getSplitNum() { - return splitNum; - } - - public void setSplitNum(int splitNum) { - this.splitNum = splitNum; - } - - public PartitionDesc getCurrPart() { - return currPart; - } - - public void setCurrPart(PartitionDesc currPart) { - this.currPart = currPart; - } - - public TableDesc getCurrTbl() { - return currTbl; - } - - public void setCurrTbl(TableDesc currTbl) { - this.currTbl = currTbl; - } - - public boolean isTblDataDone() { - return tblDataDone; - } - - public void setTblDataDone(boolean tblDataDone) { - this.tblDataDone = tblDataDone; - } - - public boolean isEmptyTable() { - return work.getTblDir() == null && (work.getPartDir() == null || work.getPartDir().isEmpty()); - } - /** * A cache of InputFormat instances. */ @@ -238,145 +205,79 @@ public boolean isEmptyTable() { return inputFormats.get(inputFormatClass); } - private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception { - Deserializer serde = table.getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(serde, job, table.getProperties(), null); - return createRowInspector(getStructOIFrom(serde.getObjectInspector())); - } - - private StructObjectInspector getRowInspectorFromPartition(PartitionDesc partition, - ObjectInspector partitionOI) throws Exception { - - String pcols = partition.getTableDesc().getProperties().getProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - String[] partKeys = pcols.trim().split("/"); - String pcolTypes = partition.getTableDesc().getProperties().getProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - String[] partKeyTypes = pcolTypes.trim().split(":"); - row[1] = createPartValue(partKeys, partition.getPartSpec(), partKeyTypes); - - return createRowInspector(getStructOIFrom(partitionOI), partKeys, partKeyTypes); + private StructObjectInspector getRowInspectorFromTable(TableDesc table, + StructObjectInspector outputOI) throws Exception { + return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector( + Arrays.asList(outputOI, vcsOI)) : outputOI; } - private StructObjectInspector getRowInspectorFromPartitionedTable(TableDesc table) - throws Exception { - Deserializer serde = table.getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(serde, job, table.getProperties(), null); - String pcols = table.getProperties().getProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - String[] partKeys = pcols.trim().split("/"); - String pcolTypes = table.getProperties().getProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - String[] partKeyTypes = pcolTypes.trim().split(":"); - row[1] = null; - return createRowInspector(getStructOIFrom(serde.getObjectInspector()), partKeys, partKeyTypes); + private StructObjectInspector getRowInspectorFromPartitionedTable(PartitionDesc partDesc, + ObjectInspector outputOI) throws Exception { + StructObjectInspector rowOI = toDelegatedOI(outputOI); + StructObjectInspector partKeyOI = getPartitionKeyOI(partDesc.getTableDesc()); + row[1] = partDesc.getPartSpec() == null ? null : createPartValue(partDesc, partKeyOI); + return ObjectInspectorFactory.getUnionStructObjectInspector( + hasVC ? Arrays.asList(rowOI, partKeyOI, vcsOI) : Arrays.asList(rowOI, partKeyOI)); } - private StructObjectInspector getStructOIFrom(ObjectInspector current) throws SerDeException { - if (objectInspector != null) { - current = DelegatedObjectInspectorFactory.reset(objectInspector, current); + // returns delegated OI, of which the inside OIs can be replaced (for partition-wise format) + private StructObjectInspector toDelegatedOI(ObjectInspector current) throws SerDeException { + if (delegatedOI != null) { + current = DelegatedObjectInspectorFactory.reset(delegatedOI, current); } else { current = DelegatedObjectInspectorFactory.wrap(current); } - return objectInspector = (StructObjectInspector) current; + return delegatedOI = (StructObjectInspector) current; } - private StructObjectInspector createRowInspector(StructObjectInspector current) - throws SerDeException { - return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector( - Arrays.asList(current, vcsOI)) : current; - } + private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception { + String pcols = tableDesc.getProperties().getProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); + String pcolTypes = tableDesc.getProperties().getProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - private StructObjectInspector createRowInspector(StructObjectInspector current, String[] partKeys, String[] partKeyTypes) - throws SerDeException { - List partNames = new ArrayList(); - List partObjectInspectors = new ArrayList(); + String[] partKeys = pcols.trim().split("/"); + String[] partKeyTypes = pcolTypes.trim().split(":"); + ObjectInspector[] inspectors = new ObjectInspector[partKeys.length]; for (int i = 0; i < partKeys.length; i++) { - String key = partKeys[i]; - partNames.add(key); - ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( + inspectors[i] = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - partObjectInspectors.add(oi); } - StructObjectInspector partObjectInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(partNames, partObjectInspectors); - - return ObjectInspectorFactory.getUnionStructObjectInspector( - hasVC ? Arrays.asList(current, partObjectInspector, vcsOI) : - Arrays.asList(current, partObjectInspector)); - } - - private Object[] createPartValue(String[] partKeys, Map partSpec, String[] partKeyTypes) { - Object[] partValues = new Object[partKeys.length]; - for (int i = 0; i < partKeys.length; i++) { - String key = partKeys[i]; - ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( - TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - partValues[i] = - ObjectInspectorConverters. - getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, oi).convert(partSpec.get(key)); + return ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList(partKeys), Arrays.asList(inspectors)); + } + + private Object[] createPartValue(PartitionDesc partDesc, StructObjectInspector partOI) { + Map partSpec = partDesc.getPartSpec(); + List fields = partOI.getAllStructFieldRefs(); + Object[] partValues = new Object[fields.size()]; + for (int i = 0; i < partValues.length; i++) { + StructField field = fields.get(i); + String value = partSpec.get(field.getFieldName()); + ObjectInspector oi = field.getFieldObjectInspector(); + partValues[i] = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi).convert(value); } return partValues; } - private void getNextPath() throws Exception { - // first time - if (iterPath == null) { - if (work.isNotPartitioned()) { - if (!tblDataDone) { - currPath = work.getTblDir(); - currTbl = work.getTblDesc(); - if (isNativeTable) { - FileSystem fs = currPath.getFileSystem(job); - if (fs.exists(currPath)) { - FileStatus[] fStats = listStatusUnderPath(fs, currPath); - for (FileStatus fStat : fStats) { - if (fStat.getLen() > 0) { - tblDataDone = true; - break; - } - } - } - } else { - tblDataDone = true; - } - - if (!tblDataDone) { - currPath = null; - } - return; - } else { - currTbl = null; - currPath = null; - } - return; - } else { - iterPath = work.getPartDir().iterator(); - iterPartDesc = work.getPartDesc().iterator(); - } - } - + private boolean getNextPath() throws Exception { while (iterPath.hasNext()) { - Path nxt = iterPath.next(); - PartitionDesc prt = null; - if (iterPartDesc != null) { - prt = iterPartDesc.next(); + currPath = iterPath.next(); + currDesc = iterPartDesc.next(); + if (isNonNativeTable) { + return true; } - FileSystem fs = nxt.getFileSystem(job); - if (fs.exists(nxt)) { - FileStatus[] fStats = listStatusUnderPath(fs, nxt); - for (FileStatus fStat : fStats) { + FileSystem fs = currPath.getFileSystem(job); + if (fs.exists(currPath)) { + for (FileStatus fStat : listStatusUnderPath(fs, currPath)) { if (fStat.getLen() > 0) { - currPath = nxt; - if (iterPartDesc != null) { - currPart = prt; - } - return; + return true; } } } } + return false; } /** @@ -385,94 +286,40 @@ private void getNextPath() throws Exception { private static Map oiSettableProperties = new HashMap(); private RecordReader getRecordReader() throws Exception { - if (currPath == null) { - getNextPath(); - if (currPath == null) { + if (!iterSplits.hasNext()) { + FetchInputFormatSplit[] splits = getNextSplits(); + if (splits == null) { return null; } - - // not using FileInputFormat.setInputPaths() here because it forces a - // connection - // to the default file system - which may or may not be online during pure - // metadata - // operations - job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath - .toString())); - - // Fetch operator is not vectorized and as such turn vectorization flag off so that - // non-vectorized record reader is created below. - if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { - HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); - } - - PartitionDesc partDesc; - if (currTbl == null) { - partDesc = currPart; + if (convertibleOI == null) { + currSerDe = tableSerDe; + partTblObjectInspectorConverter = null; + } else if (!needConversion(currDesc)) { + currSerDe = tableSerDe; + partTblObjectInspectorConverter = null; + getRowInspectorFromPartitionedTable(currDesc, convertibleOI); } else { - partDesc = new PartitionDesc(currTbl, null); - } - - Class formatter = partDesc.getInputFileFormatClass(); - inputFormat = getInputFormatFromCache(formatter, job); - Utilities.copyTableJobPropertiesToConf(partDesc.getTableDesc(), job); - InputSplit[] splits = inputFormat.getSplits(job, 1); - FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; - for (int i = 0; i < splits.length; i++) { - inputSplits[i] = new FetchInputFormatSplit(splits[i], formatter.getName()); - } - if (work.getSplitSample() != null) { - inputSplits = splitSampling(work.getSplitSample(), inputSplits); - } - this.inputSplits = inputSplits; - - splitNum = 0; - serde = partDesc.getDeserializer(job); - SerDeUtils.initializeSerDe(serde, job, partDesc.getTableDesc().getProperties(), - partDesc.getProperties()); + currSerDe = currDesc.getDeserializer(job); - if (currTbl != null) { - tblSerde = serde; - } - else { - tblSerde = currPart.getTableDesc().getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(tblSerde, job, currPart.getTableDesc().getProperties(), null); - } - - ObjectInspector outputOI = ObjectInspectorConverters.getConvertedOI( - serde.getObjectInspector(), - partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI, - oiSettableProperties); - - partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( - serde.getObjectInspector(), outputOI); - - if (LOG.isDebugEnabled()) { - LOG.debug("Creating fetchTask with deserializer typeinfo: " - + serde.getObjectInspector().getTypeName()); - LOG.debug("deserializer properties:\ntable properties: " + - partDesc.getTableDesc().getProperties() + "\npartition properties: " + - partDesc.getProperties()); - } - - if (currPart != null) { - getRowInspectorFromPartition(currPart, outputOI); - } - } + partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( + currSerDe.getObjectInspector(), convertibleOI); - if (splitNum >= inputSplits.length) { - if (currRecReader != null) { - currRecReader.close(); - currRecReader = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Creating fetchTask with deserializer typeinfo: " + + currSerDe.getObjectInspector().getTypeName()); + LOG.debug("deserializer properties:\ntable properties: " + + currDesc.getTableDesc().getProperties() + "\npartition properties: " + + currDesc.getProperties()); + } + getRowInspectorFromPartitionedTable(currDesc, convertibleOI); } - currPath = null; - return getRecordReader(); + iterSplits = Arrays.asList(splits).iterator(); } - final FetchInputFormatSplit target = inputSplits[splitNum]; + final FetchInputFormatSplit target = iterSplits.next(); @SuppressWarnings("unchecked") - final RecordReader reader = - inputFormat.getRecordReader(target.getInputSplit(), job, Reporter.NULL); + final RecordReader reader = target.getRecordReader(job); if (hasVC || work.getSplitSample() != null) { currRecReader = new HiveRecordReader(reader, job) { @Override @@ -487,23 +334,54 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException } }; ((HiveContextAwareRecordReader)currRecReader). - initIOContext(target, job, inputFormat.getClass(), reader); + initIOContext(target, job, target.inputFormat.getClass(), reader); } else { currRecReader = reader; } - splitNum++; key = currRecReader.createKey(); value = currRecReader.createValue(); + headerCount = footerCount = 0; return currRecReader; } + protected FetchInputFormatSplit[] getNextSplits() throws Exception { + while (getNextPath()) { + // not using FileInputFormat.setInputPaths() here because it forces a connection to the + // default file system - which may or may not be online during pure metadata operations + job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString())); + + // Fetch operator is not vectorized and as such turn vectorization flag off so that + // non-vectorized record reader is created below. + if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { + HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + } + + Class formatter = currDesc.getInputFileFormatClass(); + InputFormat inputFormat = getInputFormatFromCache(formatter, job); + Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); + + InputSplit[] splits = inputFormat.getSplits(job, 1); + FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; + for (int i = 0; i < splits.length; i++) { + inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat); + } + if (work.getSplitSample() != null) { + inputSplits = splitSampling(work.getSplitSample(), inputSplits); + } + if (inputSplits.length > 0) { + return inputSplits; + } + } + return null; + } + private FetchInputFormatSplit[] splitSampling(SplitSample splitSample, FetchInputFormatSplit[] splits) { long totalSize = 0; for (FetchInputFormatSplit split: splits) { totalSize += split.getLength(); } - List result = new ArrayList(); + List result = new ArrayList(splits.length); long targetSize = splitSample.getTargetSize(totalSize); int startIndex = splitSample.getSeedNum() % splits.length; long size = 0; @@ -572,28 +450,16 @@ public InspectableObject getNextRow() throws IOException { * If file contains footer, used FooterBuffer to cache and remove footer * records at the end of the file. */ - headerCount = 0; - footerCount = 0; - TableDesc table = null; - if (currTbl != null) { - table = currTbl; - } else if (currPart != null) { - table = currPart.getTableDesc(); - } - if (table != null) { - headerCount = Utilities.getHeaderCount(table); - footerCount = Utilities.getFooterCount(table, job); - } + headerCount = Utilities.getHeaderCount(currDesc.getTableDesc()); + footerCount = Utilities.getFooterCount(currDesc.getTableDesc(), job); // Skip header lines. opNotEOF = Utilities.skipHeader(currRecReader, headerCount, key, value); // Initialize footer buffer. - if (opNotEOF) { - if (footerCount > 0) { - footerBuffer = new FooterBuffer(); - opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value); - } + if (opNotEOF && footerCount > 0) { + footerBuffer = new FooterBuffer(); + opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value); } } @@ -610,25 +476,24 @@ public InspectableObject getNextRow() throws IOException { if (opNotEOF) { if (operator != null && context != null && context.inputFileChanged()) { // The child operators cleanup if input file has changed - try { - operator.cleanUpInputFileChanged(); - } catch (HiveException e) { - throw new IOException(e); - } + operator.cleanUpInputFileChanged(); } if (hasVC) { - vcValues = MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, serde); - row[isPartitioned ? 2 : 1] = vcValues; + row[isPartitioned ? 2 : 1] = + MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, currSerDe); + } + Object deserialized = currSerDe.deserialize(value); + if (partTblObjectInspectorConverter != null) { + deserialized = partTblObjectInspectorConverter.convert(deserialized); } - row[0] = partTblObjectInspectorConverter.convert(serde.deserialize(value)); if (hasVC || isPartitioned) { + row[0] = deserialized; inspectable.o = row; - inspectable.oi = rowObjectInspector; - return inspectable; + } else { + inspectable.o = deserialized; } - inspectable.o = row[0]; - inspectable.oi = tblSerde.getObjectInspector(); + inspectable.oi = currSerDe.getObjectInspector(); return inspectable; } else { currRecReader.close(); @@ -658,13 +523,13 @@ public void clearFetchContext() throws HiveException { context.clear(); context = null; } - this.currTbl = null; this.currPath = null; this.iterPath = null; this.iterPartDesc = null; + this.iterSplits = Iterators.emptyIterator(); } catch (Exception e) { throw new HiveException("Failed with exception " + e.getMessage() - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + + StringUtils.stringifyException(e)); } } @@ -673,25 +538,31 @@ public void clearFetchContext() throws HiveException { */ public void setupContext(List paths) { this.iterPath = paths.iterator(); - if (work.isNotPartitioned()) { - this.currTbl = work.getTblDesc(); + List partitionDescs; + if (!isPartitioned) { + this.iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null)); } else { this.iterPartDesc = work.getPartDescs(paths).iterator(); } - setupExecContext(); + this.context = setupExecContext(operator); } /** * returns output ObjectInspector, never null */ - public ObjectInspector getOutputObjectInspector() throws HiveException { - if(null != work.getStatRowOI()) { - return work.getStatRowOI(); - } + public ObjectInspector getOutputObjectInspector() { + return outputOI; + } + + private ObjectInspector setupOutputObjectInspector() throws HiveException { + TableDesc tableDesc = work.getTblDesc(); try { - if (work.isNotPartitioned()) { - return getRowInspectorFromTable(work.getTblDesc()); + tableSerDe = tableDesc.getDeserializer(job); + ObjectInspector tableOI = tableSerDe.getObjectInspector(); + if (!isPartitioned) { + return getRowInspectorFromTable(tableDesc, (StructObjectInspector) tableOI); } + PartitionDesc partDesc = new PartitionDesc(tableDesc, null); List listParts = work.getPartDesc(); // Chose the table descriptor if none of the partitions is present. // For eg: consider the query: @@ -699,39 +570,38 @@ public ObjectInspector getOutputObjectInspector() throws HiveException { // Both T1 and T2 and partitioned tables, but T1 does not have any partitions // FetchOperator is invoked for T1, and listParts is empty. In that case, // use T1's schema to get the ObjectInspector. - if (listParts == null || listParts.isEmpty()) { - return getRowInspectorFromPartitionedTable(work.getTblDesc()); + if (listParts == null || listParts.isEmpty() || !needConversion(tableDesc, listParts)) { + return getRowInspectorFromPartitionedTable(partDesc, tableOI); } + convertibleOI = ObjectInspectorConverters.getConvertedOI(tableOI, tableOI, + oiSettableProperties, false); + return getRowInspectorFromPartitionedTable(partDesc, convertibleOI); + } catch (Exception e) { + throw new HiveException("Failed with exception " + e.getMessage() + + StringUtils.stringifyException(e)); + } + } + + private boolean needConversion(PartitionDesc partitionDesc) { + return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc)); + } - // Choose any partition. It's OI needs to be converted to the table OI - // Whenever a new partition is being read, a new converter is being created - PartitionDesc partition = listParts.get(0); - Deserializer tblSerde = partition.getTableDesc().getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(tblSerde, job, partition.getTableDesc().getProperties(), null); - - partitionedTableOI = null; - ObjectInspector tableOI = tblSerde.getObjectInspector(); - - // Get the OI corresponding to all the partitions - for (PartitionDesc listPart : listParts) { - partition = listPart; - Deserializer partSerde = listPart.getDeserializer(job); - SerDeUtils.initializeSerDe(partSerde, job, partition.getTableDesc().getProperties(), - listPart.getProperties()); - - partitionedTableOI = ObjectInspectorConverters.getConvertedOI( - partSerde.getObjectInspector(), tableOI, oiSettableProperties); - if (!partitionedTableOI.equals(tableOI)) { - break; + private boolean needConversion(TableDesc tableDesc, List partitionDescs) { + String deserializer = tableDesc.getDeserializerClass().getName(); + Properties tableProps = tableDesc.getProperties(); + for (PartitionDesc partitionDesc : partitionDescs) { + if (!deserializer.equals(partitionDesc.getDeserializerClassName())) { + return true; + } + Properties partProps = partitionDesc.getProperties(); + for (Map.Entry entry : tableProps.entrySet()) { + String value = (String) partProps.get(entry.getKey()); + if (!org.apache.commons.lang3.StringUtils.equals(value, (String) entry.getValue())) { + return true; } } - return getRowInspectorFromPartition(partition, partitionedTableOI); - } catch (Exception e) { - throw new HiveException("Failed with exception " + e.getMessage() - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { - currPart = null; } + return false; } /** @@ -768,9 +638,15 @@ public ObjectInspector getOutputObjectInspector() throws HiveException { // what's different is that this is evaluated by unit of row using RecordReader.getPos() // and that is evaluated by unit of split using InputSplt.getLength(). private long shrinkedLength = -1; + private InputFormat inputFormat; + + public FetchInputFormatSplit(InputSplit split, InputFormat inputFormat) { + super(split, inputFormat.getClass().getName()); + this.inputFormat = inputFormat; + } - public FetchInputFormatSplit(InputSplit split, String name) { - super(split, name); + public RecordReader getRecordReader(JobConf job) throws IOException { + return inputFormat.getRecordReader(getInputSplit(), job, Reporter.NULL); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java index 5d126a5..4ed3c32 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java @@ -133,20 +133,20 @@ public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOE } // random sampling - public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf job, - Operator operator) { + public static FetchOperator createSampler(FetchWork work, HiveConf conf, JobConf job, + Operator operator) throws HiveException { int sampleNum = conf.getIntVar(HiveConf.ConfVars.HIVESAMPLINGNUMBERFORORDERBY); float samplePercent = conf.getFloatVar(HiveConf.ConfVars.HIVESAMPLINGPERCENTFORORDERBY); if (samplePercent < 0.0 || samplePercent > 1.0) { throw new IllegalArgumentException("Percentile value must be within the range of 0 to 1."); } - FetchSampler sampler = new FetchSampler(work, job, operator); + RandomSampler sampler = new RandomSampler(work, job, operator); sampler.setSampleNum(sampleNum); sampler.setSamplePercent(samplePercent); return sampler; } - private static class FetchSampler extends FetchOperator { + private static class RandomSampler extends FetchOperator { private int sampleNum = 1000; private float samplePercent = 0.1f; @@ -154,7 +154,8 @@ public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf private int sampled; - public FetchSampler(FetchWork work, JobConf job, Operator operator) { + public RandomSampler(FetchWork work, JobConf job, Operator operator) + throws HiveException { super(work, job, operator, null); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 7fb4c46..e4a23a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -63,7 +62,6 @@ private static final String PLAN_KEY = "__MAP_PLAN__"; private MapOperator mo; - private Map fetchOperators; private OutputCollector oc; private JobConf jc; private boolean abort = false; @@ -235,15 +233,6 @@ public void close() { } } - if (fetchOperators != null) { - MapredLocalWork localWork = mo.getConf().getMapLocalWork(); - for (Map.Entry entry : fetchOperators.entrySet()) { - Operator forwardOp = localWork - .getAliasToWork().get(entry.getKey()); - forwardOp.close(abort); - } - } - if (isLogInfoEnabled) { long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed(); l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 496f6a6..70c1745 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -5975,7 +5975,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) try { StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc - .getDeserializer().getObjectInspector(); + .getDeserializer(conf).getObjectInspector(); List fields = rowObjectInspector .getAllStructFieldRefs(); for (int i = 0; i < fields.size(); i++) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index 78d4d1f..bb3fd36 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.util.ReflectionUtils; /** * TableDesc. @@ -79,9 +81,10 @@ public TableDesc( /** * Return a deserializer object corresponding to the tableDesc. */ - public Deserializer getDeserializer() throws Exception { - Deserializer de = getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(de, null, properties, null); + public Deserializer getDeserializer(Configuration conf) throws Exception { + Deserializer de = ReflectionUtils.newInstance( + getDeserializerClass().asSubclass(Deserializer.class), conf); + SerDeUtils.initializeSerDe(de, conf, properties, null); return de; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java index 8a42577..2a47d97 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java @@ -209,7 +209,7 @@ public static ObjectInspector getConvertedOI( * can contain non-settable fields only if inputOI equals outputOI and equalsCheck is * true. */ - private static ObjectInspector getConvertedOI( + public static ObjectInspector getConvertedOI( ObjectInspector inputOI, ObjectInspector outputOI, Map oiSettableProperties, diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java index 49884b8..cb996a8 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java @@ -478,6 +478,9 @@ public HiveVarcharConverter(PrimitiveObjectInspector inputOI, @Override public Object convert(Object input) { + if (input == null) { + return null; + } switch (inputOI.getPrimitiveCategory()) { case BOOLEAN: return outputOI.set(hc, @@ -504,6 +507,9 @@ public HiveCharConverter(PrimitiveObjectInspector inputOI, @Override public Object convert(Object input) { + if (input == null) { + return null; + } switch (inputOI.getPrimitiveCategory()) { case BOOLEAN: return outputOI.set(hc,