diff --git data/files/datatypes.txt data/files/datatypes.txt index 0228a27..458c5bd 100644 --- data/files/datatypes.txt +++ data/files/datatypes.txt @@ -1,3 +1,3 @@ -\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N --1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N +\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N +-1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N\N 1true1.11121x2ykva92.2111.01abcd1111213142212212x1abcd22012-04-22 09:00:00.123456789123456789.0123456YWJjZA==2013-01-01abc123abc123X'01FF' diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 395a5f5..e1aa12b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -114,7 +114,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; @@ -222,7 +221,7 @@ public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) { String tableName = "result"; List lst = null; try { - lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer()); + lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf)); } catch (Exception e) { LOG.warn("Error getting schema: " + org.apache.hadoop.util.StringUtils.stringifyException(e)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 6338c3c..69afefd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -26,8 +26,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.lang3.StringEscapeUtils; +import com.google.common.collect.Iterators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; @@ -49,14 +51,13 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -70,74 +71,81 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.AnnotationUtils; /** * FetchTask implementation. **/ public class FetchOperator implements Serializable { - static Log LOG = LogFactory.getLog(FetchOperator.class.getName()); - static LogHelper console = new LogHelper(LOG); + static final Log LOG = LogFactory.getLog(FetchOperator.class.getName()); + static final LogHelper console = new LogHelper(LOG); public static final String FETCH_OPERATOR_DIRECTORY_LIST = "hive.complete.dir.list"; - private boolean isNativeTable; private FetchWork work; - protected Operator operator; // operator tree for processing row further (option) - private int splitNum; - private PartitionDesc currPart; - private TableDesc currTbl; - private boolean tblDataDone; - private FooterBuffer footerBuffer = null; - private int headerCount = 0; - private int footerCount = 0; - - private boolean hasVC; - private boolean isPartitioned; + private final Operator operator; // operator tree for processing row further (optional) + + private final boolean hasVC; + private final boolean isStatReader; + private final boolean isPartitioned; + private final boolean isNonNativeTable; private StructObjectInspector vcsOI; private List vcCols; private ExecMapperContext context; + private transient Deserializer tableSerDe; + private transient StructObjectInspector tableOI; + private transient StructObjectInspector partKeyOI; + private transient StructObjectInspector convertedOI; + + private transient Iterator iterPath; + private transient Iterator iterPartDesc; + private transient Iterator iterSplits = Iterators.emptyIterator(); + + private transient Path currPath; + private transient PartitionDesc currDesc; + private transient Deserializer currSerDe; + private transient Converter ObjectConverter; private transient RecordReader currRecReader; - private transient FetchInputFormatSplit[] inputSplits; - private transient InputFormat inputFormat; + private transient JobConf job; private transient WritableComparable key; private transient Writable value; private transient Object[] vcValues; - private transient Deserializer serde; - private transient Deserializer tblSerde; - private transient Converter partTblObjectInspectorConverter; - private transient Iterator iterPath; - private transient Iterator iterPartDesc; - private transient Path currPath; - private transient StructObjectInspector objectInspector; - private transient StructObjectInspector rowObjectInspector; - private transient ObjectInspector partitionedTableOI; + private transient int headerCount; + private transient int footerCount; + private transient FooterBuffer footerBuffer; + + private transient StructObjectInspector outputOI; private transient Object[] row; - public FetchOperator() { - } - - public FetchOperator(FetchWork work, JobConf job) { - this.job = job; - this.work = work; - initialize(); + public FetchOperator(FetchWork work, JobConf job) throws HiveException { + this(work, job, null, null); } public FetchOperator(FetchWork work, JobConf job, Operator operator, - List vcCols) { + List vcCols) throws HiveException { this.job = job; this.work = work; this.operator = operator; this.vcCols = vcCols; + this.hasVC = vcCols != null && !vcCols.isEmpty(); + this.isStatReader = work.getTblDesc() == null; + this.isPartitioned = !isStatReader && work.isPartitioned(); + this.isNonNativeTable = !isStatReader && work.getTblDesc().isNonNative(); initialize(); } - private void initialize() { - if (hasVC = vcCols != null && !vcCols.isEmpty()) { + private void initialize() throws HiveException { + if (isStatReader) { + outputOI = work.getStatRowOI(); + return; + } + if (hasVC) { List names = new ArrayList(vcCols.size()); List inspectors = new ArrayList(vcCols.size()); for (VirtualColumn vc : vcCols) { @@ -147,8 +155,6 @@ private void initialize() { vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors); vcValues = new Object[vcCols.size()]; } - isPartitioned = work.isPartitioned(); - tblDataDone = false; if (hasVC && isPartitioned) { row = new Object[3]; } else if (hasVC || isPartitioned) { @@ -156,21 +162,27 @@ private void initialize() { } else { row = new Object[1]; } - if (work.getTblDesc() != null) { - isNativeTable = !work.getTblDesc().isNonNative(); + if (isPartitioned) { + iterPath = work.getPartDir().iterator(); + iterPartDesc = work.getPartDesc().iterator(); } else { - isNativeTable = true; + iterPath = Arrays.asList(work.getTblDir()).iterator(); + iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null)); } - setupExecContext(); + outputOI = setupOutputObjectInspector(); + context = setupExecContext(operator, work.getPathLists()); } - private void setupExecContext() { + private ExecMapperContext setupExecContext(Operator operator, List paths) { + ExecMapperContext context = null; if (hasVC || work.getSplitSample() != null) { context = new ExecMapperContext(job); if (operator != null) { operator.setExecContext(context); } } + setFetchOperatorContext(job, paths); + return context; } public FetchWork getWork() { @@ -181,42 +193,6 @@ public void setWork(FetchWork work) { this.work = work; } - public int getSplitNum() { - return splitNum; - } - - public void setSplitNum(int splitNum) { - this.splitNum = splitNum; - } - - public PartitionDesc getCurrPart() { - return currPart; - } - - public void setCurrPart(PartitionDesc currPart) { - this.currPart = currPart; - } - - public TableDesc getCurrTbl() { - return currTbl; - } - - public void setCurrTbl(TableDesc currTbl) { - this.currTbl = currTbl; - } - - public boolean isTblDataDone() { - return tblDataDone; - } - - public void setTblDataDone(boolean tblDataDone) { - this.tblDataDone = tblDataDone; - } - - public boolean isEmptyTable() { - return work.getTblDir() == null && (work.getPartDir() == null || work.getPartDir().isEmpty()); - } - /** * A cache of InputFormat instances. */ @@ -243,146 +219,54 @@ static InputFormat getInputFormatFromCache(Class inputFor return format; } - private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception { - Deserializer serde = table.getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDeWithoutErrorCheck(serde, job, table.getProperties(), null); - return createRowInspector(getStructOIFrom(serde.getObjectInspector())); - } - - private StructObjectInspector getRowInspectorFromPartition(PartitionDesc partition, - ObjectInspector partitionOI) throws Exception { - - String pcols = partition.getTableDesc().getProperties().getProperty( + private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception { + String pcols = tableDesc.getProperties().getProperty( org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - String[] partKeys = pcols.trim().split("/"); - String pcolTypes = partition.getTableDesc().getProperties().getProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - String[] partKeyTypes = pcolTypes.trim().split(":"); - row[1] = createPartValue(partKeys, partition.getPartSpec(), partKeyTypes); + String pcolTypes = tableDesc.getProperties().getProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - return createRowInspector(getStructOIFrom(partitionOI), partKeys, partKeyTypes); - } - - private StructObjectInspector getRowInspectorFromPartitionedTable(TableDesc table) - throws Exception { - Deserializer serde = table.getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(serde, job, table.getProperties(), null); - String pcols = table.getProperties().getProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); String[] partKeys = pcols.trim().split("/"); - String pcolTypes = table.getProperties().getProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - String[] partKeyTypes = pcolTypes.trim().split(":"); - row[1] = null; - return createRowInspector(getStructOIFrom(serde.getObjectInspector()), partKeys, partKeyTypes); - } - - private StructObjectInspector getStructOIFrom(ObjectInspector current) throws SerDeException { - if (objectInspector != null) { - current = DelegatedObjectInspectorFactory.reset(objectInspector, current); - } else { - current = DelegatedObjectInspectorFactory.wrap(current); - } - return objectInspector = (StructObjectInspector) current; - } - - private StructObjectInspector createRowInspector(StructObjectInspector current) - throws SerDeException { - return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector( - Arrays.asList(current, vcsOI)) : current; - } - - private StructObjectInspector createRowInspector(StructObjectInspector current, String[] partKeys, String[] partKeyTypes) - throws SerDeException { - List partNames = new ArrayList(); - List partObjectInspectors = new ArrayList(); + String[] partKeyTypes = pcolTypes.trim().split(":"); + ObjectInspector[] inspectors = new ObjectInspector[partKeys.length]; for (int i = 0; i < partKeys.length; i++) { - String key = partKeys[i]; - partNames.add(key); - ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( + inspectors[i] = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - partObjectInspectors.add(oi); } - StructObjectInspector partObjectInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(partNames, partObjectInspectors); - - return ObjectInspectorFactory.getUnionStructObjectInspector( - hasVC ? Arrays.asList(current, partObjectInspector, vcsOI) : - Arrays.asList(current, partObjectInspector)); - } - - private Object[] createPartValue(String[] partKeys, Map partSpec, String[] partKeyTypes) { - Object[] partValues = new Object[partKeys.length]; - for (int i = 0; i < partKeys.length; i++) { - String key = partKeys[i]; - ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( - TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - partValues[i] = - ObjectInspectorConverters. - getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, oi).convert(partSpec.get(key)); + return ObjectInspectorFactory.getStandardStructObjectInspector( + Arrays.asList(partKeys), Arrays.asList(inspectors)); + } + + private Object[] createPartValue(PartitionDesc partDesc, StructObjectInspector partOI) { + Map partSpec = partDesc.getPartSpec(); + List fields = partOI.getAllStructFieldRefs(); + Object[] partValues = new Object[fields.size()]; + for (int i = 0; i < partValues.length; i++) { + StructField field = fields.get(i); + String value = partSpec.get(field.getFieldName()); + ObjectInspector oi = field.getFieldObjectInspector(); + partValues[i] = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi).convert(value); } return partValues; } - private void getNextPath() throws Exception { - // first time - if (iterPath == null) { - if (work.isNotPartitioned()) { - if (!tblDataDone) { - currPath = work.getTblDir(); - currTbl = work.getTblDesc(); - if (isNativeTable) { - FileSystem fs = currPath.getFileSystem(job); - if (fs.exists(currPath)) { - FileStatus[] fStats = listStatusUnderPath(fs, currPath); - for (FileStatus fStat : fStats) { - if (fStat.getLen() > 0) { - tblDataDone = true; - break; - } - } - } - } else { - tblDataDone = true; - } - - if (!tblDataDone) { - currPath = null; - } - return; - } else { - currTbl = null; - currPath = null; - } - return; - } else { - setFetchOperatorContext(job, work.getPartDir()); - iterPath = work.getPartDir().iterator(); - iterPartDesc = work.getPartDesc().iterator(); - } - } - + private boolean getNextPath() throws Exception { while (iterPath.hasNext()) { - Path nxt = iterPath.next(); - PartitionDesc prt = null; - if (iterPartDesc != null) { - prt = iterPartDesc.next(); + currPath = iterPath.next(); + currDesc = iterPartDesc.next(); + if (isNonNativeTable) { + return true; } - FileSystem fs = nxt.getFileSystem(job); - if (fs.exists(nxt)) { - FileStatus[] fStats = listStatusUnderPath(fs, nxt); - for (FileStatus fStat : fStats) { + FileSystem fs = currPath.getFileSystem(job); + if (fs.exists(currPath)) { + for (FileStatus fStat : listStatusUnderPath(fs, currPath)) { if (fStat.getLen() > 0) { - currPath = nxt; - if (iterPartDesc != null) { - currPart = prt; - } - return; + return true; } } } } + return false; } /** @@ -390,119 +274,53 @@ private void getNextPath() throws Exception { * This helps InputFormats make decisions based on the scope of the complete * operation. * @param conf the configuration to modify - * @param partDirs the list of partition directories + * @param paths the list of input directories */ - static void setFetchOperatorContext(JobConf conf, - ArrayList partDirs) { - if (partDirs != null) { + static void setFetchOperatorContext(JobConf conf, List paths) { + if (paths != null) { StringBuilder buff = new StringBuilder(); - boolean first = true; - for(Path p: partDirs) { - if (first) { - first = false; - } else { + for (Path path : paths) { + if (buff.length() > 0) { buff.append('\t'); } - buff.append(StringEscapeUtils.escapeJava(p.toString())); + buff.append(StringEscapeUtils.escapeJava(path.toString())); } conf.set(FETCH_OPERATOR_DIRECTORY_LIST, buff.toString()); } } - /** - * A cache of Object Inspector Settable Properties. - */ - private static Map oiSettableProperties = new HashMap(); - private RecordReader getRecordReader() throws Exception { - if (currPath == null) { - getNextPath(); - if (currPath == null) { + if (!iterSplits.hasNext()) { + FetchInputFormatSplit[] splits = getNextSplits(); + if (splits == null) { return null; } - - // not using FileInputFormat.setInputPaths() here because it forces a - // connection - // to the default file system - which may or may not be online during pure - // metadata - // operations - job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath - .toString())); - - // Fetch operator is not vectorized and as such turn vectorization flag off so that - // non-vectorized record reader is created below. - if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { - HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); - } - - PartitionDesc partDesc; - if (currTbl == null) { - partDesc = currPart; + if (!isPartitioned || convertedOI == null) { + currSerDe = tableSerDe; + ObjectConverter = null; } else { - partDesc = new PartitionDesc(currTbl, null); - } - - Class formatter = partDesc.getInputFileFormatClass(); - inputFormat = getInputFormatFromCache(formatter, job); - Utilities.copyTableJobPropertiesToConf(partDesc.getTableDesc(), job); - InputSplit[] splits = inputFormat.getSplits(job, 1); - FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; - for (int i = 0; i < splits.length; i++) { - inputSplits[i] = new FetchInputFormatSplit(splits[i], formatter.getName()); - } - if (work.getSplitSample() != null) { - inputSplits = splitSampling(work.getSplitSample(), inputSplits); - } - this.inputSplits = inputSplits; - - splitNum = 0; - serde = partDesc.getDeserializer(job); - SerDeUtils.initializeSerDe(serde, job, partDesc.getTableDesc().getProperties(), - partDesc.getProperties()); - - if (currTbl != null) { - tblSerde = serde; + currSerDe = needConversion(currDesc) ? currDesc.getDeserializer(job) : tableSerDe; + ObjectInspector inputOI = currSerDe.getObjectInspector(); + ObjectConverter = ObjectInspectorConverters.getConverter(inputOI, convertedOI); } - else { - tblSerde = currPart.getTableDesc().getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(tblSerde, job, currPart.getTableDesc().getProperties(), null); + if (isPartitioned) { + row[1] = createPartValue(currDesc, partKeyOI); } - - ObjectInspector outputOI = ObjectInspectorConverters.getConvertedOI( - serde.getObjectInspector(), - partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI, - oiSettableProperties); - - partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( - serde.getObjectInspector(), outputOI); + iterSplits = Arrays.asList(splits).iterator(); if (LOG.isDebugEnabled()) { LOG.debug("Creating fetchTask with deserializer typeinfo: " - + serde.getObjectInspector().getTypeName()); + + currSerDe.getObjectInspector().getTypeName()); LOG.debug("deserializer properties:\ntable properties: " + - partDesc.getTableDesc().getProperties() + "\npartition properties: " + - partDesc.getProperties()); - } - - if (currPart != null) { - getRowInspectorFromPartition(currPart, outputOI); - } - } - - if (splitNum >= inputSplits.length) { - if (currRecReader != null) { - currRecReader.close(); - currRecReader = null; + currDesc.getTableDesc().getProperties() + "\npartition properties: " + + currDesc.getProperties()); } - currPath = null; - return getRecordReader(); } - final FetchInputFormatSplit target = inputSplits[splitNum]; + final FetchInputFormatSplit target = iterSplits.next(); @SuppressWarnings("unchecked") - final RecordReader reader = - inputFormat.getRecordReader(target.getInputSplit(), job, Reporter.NULL); + final RecordReader reader = target.getRecordReader(job); if (hasVC || work.getSplitSample() != null) { currRecReader = new HiveRecordReader(reader, job) { @Override @@ -517,23 +335,52 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException } }; ((HiveContextAwareRecordReader)currRecReader). - initIOContext(target, job, inputFormat.getClass(), reader); + initIOContext(target, job, target.inputFormat.getClass(), reader); } else { currRecReader = reader; } - splitNum++; key = currRecReader.createKey(); value = currRecReader.createValue(); + headerCount = footerCount = 0; return currRecReader; } + protected FetchInputFormatSplit[] getNextSplits() throws Exception { + while (getNextPath()) { + // not using FileInputFormat.setInputPaths() here because it forces a connection to the + // default file system - which may or may not be online during pure metadata operations + job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString())); + + // Fetch operator is not vectorized and as such turn vectorization flag off so that + // non-vectorized record reader is created below. + HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + + Class formatter = currDesc.getInputFileFormatClass(); + Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); + InputFormat inputFormat = getInputFormatFromCache(formatter, job); + + InputSplit[] splits = inputFormat.getSplits(job, 1); + FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; + for (int i = 0; i < splits.length; i++) { + inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat); + } + if (work.getSplitSample() != null) { + inputSplits = splitSampling(work.getSplitSample(), inputSplits); + } + if (inputSplits.length > 0) { + return inputSplits; + } + } + return null; + } + private FetchInputFormatSplit[] splitSampling(SplitSample splitSample, FetchInputFormatSplit[] splits) { long totalSize = 0; for (FetchInputFormatSplit split: splits) { totalSize += split.getLength(); } - List result = new ArrayList(); + List result = new ArrayList(splits.length); long targetSize = splitSample.getTargetSize(totalSize); int startIndex = splitSample.getSeedNum() % splits.length; long size = 0; @@ -557,18 +404,18 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException * Currently only used by FetchTask. **/ public boolean pushRow() throws IOException, HiveException { - if(work.getRowsComputedUsingStats() != null) { + if (work.getRowsComputedUsingStats() != null) { for (List row : work.getRowsComputedUsingStats()) { operator.processOp(row, 0); } - operator.flush(); + flushRow(); return true; } InspectableObject row = getNextRow(); if (row != null) { pushRow(row); } else { - operator.flush(); + flushRow(); } return row != null; } @@ -577,6 +424,10 @@ protected void pushRow(InspectableObject row) throws HiveException { operator.processOp(row.o, 0); } + protected void flushRow() throws HiveException { + operator.flush(); + } + private transient final InspectableObject inspectable = new InspectableObject(); /** @@ -602,28 +453,16 @@ public InspectableObject getNextRow() throws IOException { * If file contains footer, used FooterBuffer to cache and remove footer * records at the end of the file. */ - headerCount = 0; - footerCount = 0; - TableDesc table = null; - if (currTbl != null) { - table = currTbl; - } else if (currPart != null) { - table = currPart.getTableDesc(); - } - if (table != null) { - headerCount = Utilities.getHeaderCount(table); - footerCount = Utilities.getFooterCount(table, job); - } + headerCount = Utilities.getHeaderCount(currDesc.getTableDesc()); + footerCount = Utilities.getFooterCount(currDesc.getTableDesc(), job); // Skip header lines. opNotEOF = Utilities.skipHeader(currRecReader, headerCount, key, value); // Initialize footer buffer. - if (opNotEOF) { - if (footerCount > 0) { - footerBuffer = new FooterBuffer(); - opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value); - } + if (opNotEOF && footerCount > 0) { + footerBuffer = new FooterBuffer(); + opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value); } } @@ -640,25 +479,24 @@ public InspectableObject getNextRow() throws IOException { if (opNotEOF) { if (operator != null && context != null && context.inputFileChanged()) { // The child operators cleanup if input file has changed - try { - operator.cleanUpInputFileChanged(); - } catch (HiveException e) { - throw new IOException(e); - } + operator.cleanUpInputFileChanged(); } if (hasVC) { - vcValues = MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, serde); - row[isPartitioned ? 2 : 1] = vcValues; + row[isPartitioned ? 2 : 1] = + MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, currSerDe); + } + Object deserialized = currSerDe.deserialize(value); + if (ObjectConverter != null) { + deserialized = ObjectConverter.convert(deserialized); } - row[0] = partTblObjectInspectorConverter.convert(serde.deserialize(value)); if (hasVC || isPartitioned) { + row[0] = deserialized; inspectable.o = row; - inspectable.oi = rowObjectInspector; - return inspectable; + } else { + inspectable.o = deserialized; } - inspectable.o = row[0]; - inspectable.oi = tblSerde.getObjectInspector(); + inspectable.oi = currSerDe.getObjectInspector(); return inspectable; } else { currRecReader.close(); @@ -680,21 +518,20 @@ public void clearFetchContext() throws HiveException { currRecReader.close(); currRecReader = null; } - if (operator != null) { + if (!operator.isClosed()) { operator.close(false); - operator = null; } if (context != null) { context.clear(); context = null; } - this.currTbl = null; this.currPath = null; this.iterPath = null; this.iterPartDesc = null; + this.iterSplits = Iterators.emptyIterator(); } catch (Exception e) { throw new HiveException("Failed with exception " + e.getMessage() - + org.apache.hadoop.util.StringUtils.stringifyException(e)); + + StringUtils.stringifyException(e)); } } @@ -703,25 +540,33 @@ public void clearFetchContext() throws HiveException { */ public void setupContext(List paths) { this.iterPath = paths.iterator(); - if (work.isNotPartitioned()) { - this.currTbl = work.getTblDesc(); + List partitionDescs; + if (!isPartitioned) { + this.iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null)); } else { this.iterPartDesc = work.getPartDescs(paths).iterator(); } - setupExecContext(); + this.context = setupExecContext(operator, paths); } /** * returns output ObjectInspector, never null */ - public ObjectInspector getOutputObjectInspector() throws HiveException { - if(null != work.getStatRowOI()) { - return work.getStatRowOI(); - } + public ObjectInspector getOutputObjectInspector() { + return outputOI; + } + + private StructObjectInspector setupOutputObjectInspector() throws HiveException { + TableDesc tableDesc = work.getTblDesc(); try { - if (work.isNotPartitioned()) { - return getRowInspectorFromTable(work.getTblDesc()); + tableSerDe = tableDesc.getDeserializer(job, true); + tableOI = (StructObjectInspector) tableSerDe.getObjectInspector(); + if (!isPartitioned) { + return getTableRowOI(tableOI); } + partKeyOI = getPartitionKeyOI(tableDesc); + + PartitionDesc partDesc = new PartitionDesc(tableDesc, null); List listParts = work.getPartDesc(); // Chose the table descriptor if none of the partitions is present. // For eg: consider the query: @@ -729,39 +574,50 @@ public ObjectInspector getOutputObjectInspector() throws HiveException { // Both T1 and T2 and partitioned tables, but T1 does not have any partitions // FetchOperator is invoked for T1, and listParts is empty. In that case, // use T1's schema to get the ObjectInspector. - if (listParts == null || listParts.isEmpty()) { - return getRowInspectorFromPartitionedTable(work.getTblDesc()); + if (listParts == null || listParts.isEmpty() || !needConversion(tableDesc, listParts)) { + return getPartitionedRowOI(tableOI); } + convertedOI = (StructObjectInspector) ObjectInspectorConverters.getConvertedOI( + tableOI, tableOI, null, false); + return getPartitionedRowOI(convertedOI); + } catch (Exception e) { + throw new HiveException("Failed with exception " + e.getMessage() + + StringUtils.stringifyException(e)); + } + } + + private StructObjectInspector getTableRowOI(StructObjectInspector valueOI) { + return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector( + Arrays.asList(valueOI, vcsOI)) : valueOI; + } + + private StructObjectInspector getPartitionedRowOI(StructObjectInspector valueOI) { + return ObjectInspectorFactory.getUnionStructObjectInspector( + hasVC ? Arrays.asList(valueOI, partKeyOI, vcsOI) : Arrays.asList(valueOI, partKeyOI)); + } - // Choose any partition. It's OI needs to be converted to the table OI - // Whenever a new partition is being read, a new converter is being created - PartitionDesc partition = listParts.get(0); - Deserializer tblSerde = partition.getTableDesc().getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(tblSerde, job, partition.getTableDesc().getProperties(), null); - - partitionedTableOI = null; - ObjectInspector tableOI = tblSerde.getObjectInspector(); - - // Get the OI corresponding to all the partitions - for (PartitionDesc listPart : listParts) { - partition = listPart; - Deserializer partSerde = listPart.getDeserializer(job); - SerDeUtils.initializeSerDe(partSerde, job, partition.getTableDesc().getProperties(), - listPart.getProperties()); - - partitionedTableOI = ObjectInspectorConverters.getConvertedOI( - partSerde.getObjectInspector(), tableOI, oiSettableProperties); - if (!partitionedTableOI.equals(tableOI)) { - break; + private boolean needConversion(PartitionDesc partitionDesc) { + return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc)); + } + + // if table and all partitions have the same schema and serde, no need to convert + private boolean needConversion(TableDesc tableDesc, List partDescs) { + Class tableSerDe = tableDesc.getDeserializerClass(); + String[] schemaProps = AnnotationUtils.getAnnotation(tableSerDe, SerDeSpec.class).schemaProps(); + Properties tableProps = tableDesc.getProperties(); + for (PartitionDesc partitionDesc : partDescs) { + if (!tableSerDe.getName().equals(partitionDesc.getDeserializerClassName())) { + return true; + } + Properties partProps = partitionDesc.getProperties(); + for (String schemaProp : schemaProps) { + if (!org.apache.commons.lang3.StringUtils.equals( + tableProps.getProperty(schemaProp), partProps.getProperty(schemaProp))) { + return true; } } - return getRowInspectorFromPartition(partition, partitionedTableOI); - } catch (Exception e) { - throw new HiveException("Failed with exception " + e.getMessage() - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - } finally { - currPart = null; } + return false; } /** @@ -797,11 +653,17 @@ public ObjectInspector getOutputObjectInspector() throws HiveException { // shrinked size for this split. counter part of this in normal mode is // InputSplitShim.shrinkedLength. // what's different is that this is evaluated by unit of row using RecordReader.getPos() - // and that is evaluated by unit of split using InputSplt.getLength(). + // and that is evaluated by unit of split using InputSplit.getLength(). private long shrinkedLength = -1; + private InputFormat inputFormat; + + public FetchInputFormatSplit(InputSplit split, InputFormat inputFormat) { + super(split, inputFormat.getClass().getName()); + this.inputFormat = inputFormat; + } - public FetchInputFormatSplit(InputSplit split, String name) { - super(split, name); + public RecordReader getRecordReader(JobConf job) throws IOException { + return inputFormat.getRecordReader(getInputSplit(), job, Reporter.NULL); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index d2b5c05..e6e3410 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -299,6 +299,10 @@ public void setAlias(String alias) { return (ret); } + public boolean isClosed() { + return state == State.CLOSE; + } + /** * checks whether all parent operators are initialized or not. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java index 5d126a5..96f4530 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java @@ -133,20 +133,20 @@ public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOE } // random sampling - public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf job, - Operator operator) { + public static FetchOperator createSampler(FetchWork work, HiveConf conf, JobConf job, + Operator operator) throws HiveException { int sampleNum = conf.getIntVar(HiveConf.ConfVars.HIVESAMPLINGNUMBERFORORDERBY); float samplePercent = conf.getFloatVar(HiveConf.ConfVars.HIVESAMPLINGPERCENTFORORDERBY); if (samplePercent < 0.0 || samplePercent > 1.0) { throw new IllegalArgumentException("Percentile value must be within the range of 0 to 1."); } - FetchSampler sampler = new FetchSampler(work, job, operator); + RandomSampler sampler = new RandomSampler(work, job, operator); sampler.setSampleNum(sampleNum); sampler.setSamplePercent(samplePercent); return sampler; } - private static class FetchSampler extends FetchOperator { + private static class RandomSampler extends FetchOperator { private int sampleNum = 1000; private float samplePercent = 0.1f; @@ -154,7 +154,8 @@ public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf private int sampled; - public FetchSampler(FetchWork work, JobConf job, Operator operator) { + public RandomSampler(FetchWork work, JobConf job, Operator operator) + throws HiveException { super(work, job, operator, null); } @@ -174,7 +175,7 @@ public boolean pushRow() throws IOException, HiveException { if (sampled < sampleNum) { return true; } - operator.flush(); + flushRow(); return false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index ca65a8e..94ae932 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hive.ql.exec.mr; import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; import java.util.Arrays; import java.util.List; @@ -30,8 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -54,7 +50,7 @@ /** * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is * the bridge between the map-reduce framework and the Hive operator pipeline at - * execution time. It's main responsabilities are: + * execution time. It's main responsibilities are: * * - Load and setup the operator pipeline from XML * - Run the pipeline by transforming key value pairs to records and forwarding them to the operators @@ -66,7 +62,6 @@ private static final String PLAN_KEY = "__MAP_PLAN__"; private MapOperator mo; - private Map fetchOperators; private OutputCollector oc; private JobConf jc; private boolean abort = false; @@ -74,7 +69,6 @@ public static final Log l4j = LogFactory.getLog(ExecMapper.class); private static boolean done; - // used to log memory usage periodically private MapredLocalWork localWork = null; private boolean isLogInfoEnabled = false; @@ -213,15 +207,6 @@ public void close() { } } - if (fetchOperators != null) { - MapredLocalWork localWork = mo.getConf().getMapRedLocalWork(); - for (Map.Entry entry : fetchOperators.entrySet()) { - Operator forwardOp = localWork - .getAliasToWork().get(entry.getKey()); - forwardOp.close(abort); - } - } - ReportStats rps = new ReportStats(rp, jc); mo.preorderMap(rps); return; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6c8624f..53c8fcb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6369,7 +6369,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) } else { try { StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc - .getDeserializer().getObjectInspector(); + .getDeserializer(conf).getObjectInspector(); List fields = rowObjectInspector .getAllStructFieldRefs(); for (int i = 0; i < fields.size(); i++) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java index 32d84ea..7cb9031 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.TreeMap; @@ -27,7 +28,7 @@ import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.parse.SplitSample; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; /** * FetchWork. @@ -52,7 +53,7 @@ private SplitSample splitSample; private transient List> rowsComputedFromStats; - private transient ObjectInspector statRowOI; + private transient StructObjectInspector statRowOI; /** * Serialization Null Format for the serde used to fetch data. @@ -62,12 +63,12 @@ public FetchWork() { } - public FetchWork(List> rowsComputedFromStats,ObjectInspector statRowOI) { + public FetchWork(List> rowsComputedFromStats, StructObjectInspector statRowOI) { this.rowsComputedFromStats = rowsComputedFromStats; this.statRowOI = statRowOI; } - public ObjectInspector getStatRowOI() { + public StructObjectInspector getStatRowOI() { return statRowOI; } @@ -173,6 +174,11 @@ public void setPartDir(ArrayList partDir) { return partDesc; } + public List getPathLists() { + return isPartitioned() ? partDir == null ? + null : new ArrayList(partDir) : Arrays.asList(tblDir); + } + /** * Get Partition descriptors in sorted (ascending) order of partition directory * diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index 374e8b6..cb36c86 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.ReflectionUtils; /** * TableDesc. @@ -83,8 +84,17 @@ public Deserializer getDeserializer() throws Exception { * Return a deserializer object corresponding to the tableDesc. */ public Deserializer getDeserializer(Configuration conf) throws Exception { - Deserializer de = getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(de, conf, properties, null); + return getDeserializer(conf, false); + } + + public Deserializer getDeserializer(Configuration conf, boolean ignoreError) throws Exception { + Deserializer de = ReflectionUtils.newInstance( + getDeserializerClass().asSubclass(Deserializer.class), conf); + if (ignoreError) { + SerDeUtils.initializeSerDeWithoutErrorCheck(de, conf, properties, null); + } else { + SerDeUtils.initializeSerDe(de, conf, properties, null); + } return de; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedListObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedListObjectInspector.java deleted file mode 100644 index 6a9215b..0000000 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedListObjectInspector.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.serde2.objectinspector; - -import java.util.List; - -public class DelegatedListObjectInspector implements ListObjectInspector { - - private ListObjectInspector delegate; - private ObjectInspector element; - - protected DelegatedListObjectInspector() { - super(); - } - public DelegatedListObjectInspector(ListObjectInspector delegate) { - this.delegate = delegate; - } - - public void reset(ListObjectInspector delegate) { - this.delegate = delegate; - if (element != null) { - DelegatedObjectInspectorFactory.reset(element, delegate.getListElementObjectInspector()); - } - } - - public ObjectInspector getListElementObjectInspector() { - return element != null ? element : - (element = DelegatedObjectInspectorFactory.wrap(delegate.getListElementObjectInspector())); - } - - public Object getListElement(Object data, int index) { - return delegate.getListElement(data, index); - } - - public int getListLength(Object data) { - return delegate.getListLength(data); - } - - public List getList(Object data) { - return delegate.getList(data); - } - - public String getTypeName() { - return delegate.getTypeName(); - } - - public Category getCategory() { - return delegate.getCategory(); - } -} diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedMapObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedMapObjectInspector.java deleted file mode 100644 index 975d5cd..0000000 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedMapObjectInspector.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.serde2.objectinspector; - -import java.util.Map; - -public class DelegatedMapObjectInspector implements MapObjectInspector { - - private MapObjectInspector delegate; - private ObjectInspector key; - private ObjectInspector value; - - protected DelegatedMapObjectInspector() { - super(); - } - public DelegatedMapObjectInspector(MapObjectInspector delegate) { - this.delegate = delegate; - } - - public void reset(MapObjectInspector current) { - this.delegate = current; - if (key != null) { - DelegatedObjectInspectorFactory.reset(key, current.getMapKeyObjectInspector()); - } - if (value != null) { - DelegatedObjectInspectorFactory.reset(value, current.getMapValueObjectInspector()); - } - } - - public ObjectInspector getMapKeyObjectInspector() { - return key != null ? key : - (key = DelegatedObjectInspectorFactory.wrap(delegate.getMapKeyObjectInspector())); - } - - public ObjectInspector getMapValueObjectInspector() { - return value != null ? value : - (value = DelegatedObjectInspectorFactory.wrap(delegate.getMapValueObjectInspector())); - } - - public Object getMapValueElement(Object data, Object key) { - return delegate.getMapValueElement(data, key); - } - - public Map getMap(Object data) { - return delegate.getMap(data); - } - - public int getMapSize(Object data) { - return delegate.getMapSize(data); - } - - public String getTypeName() { - return delegate.getTypeName(); - } - - public Category getCategory() { - return delegate.getCategory(); - } -} diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedObjectInspectorFactory.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedObjectInspectorFactory.java deleted file mode 100644 index 2db3819..0000000 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedObjectInspectorFactory.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.serde2.objectinspector; - -public class DelegatedObjectInspectorFactory { - - public static ObjectInspector wrap(ObjectInspector inspector) { - switch (inspector.getCategory()) { - case PRIMITIVE: - return inspector; - case LIST: - return new DelegatedListObjectInspector((ListObjectInspector) inspector); - case MAP: - return new DelegatedMapObjectInspector((MapObjectInspector) inspector); - case STRUCT: - return new DelegatedStructObjectInspector((StructObjectInspector) inspector); - case UNION: - return new DelegatedUnionObjectInspector((UnionObjectInspector) inspector); - default: - throw new RuntimeException("invalid category " + inspector.getCategory()); - } - } - - public static ObjectInspector reset(ObjectInspector prev, ObjectInspector current) { - switch (current.getCategory()) { - case PRIMITIVE: - break; - case LIST: - ((DelegatedListObjectInspector)prev).reset((ListObjectInspector) current); - break; - case MAP: - ((DelegatedMapObjectInspector)prev).reset((MapObjectInspector) current); - break; - case STRUCT: - ((DelegatedStructObjectInspector)prev).reset((StructObjectInspector) current); - break; - case UNION: - ((DelegatedUnionObjectInspector)prev).reset((UnionObjectInspector) current); - break; - default: - throw new RuntimeException("invalid category " + current.getCategory()); - } - return prev; - } -} diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java deleted file mode 100644 index ef66e97..0000000 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.serde2.objectinspector; - -import java.util.ArrayList; -import java.util.List; - -public class DelegatedStructObjectInspector extends StructObjectInspector { - - private StructObjectInspector delegate; - private List fields; - - protected DelegatedStructObjectInspector() { - super(); - } - public DelegatedStructObjectInspector(StructObjectInspector delegate) { - this.delegate = delegate; - } - - public void reset(StructObjectInspector current) { - this.delegate = current; - if (fields != null) { - int index = 0; - List newFields = delegate.getAllStructFieldRefs(); - for (DelegatedStructField field : fields) { - field.field = newFields.get(index++); - } - } - } - - private static class DelegatedStructField implements StructField { - private StructField field; - - public DelegatedStructField(StructField field) { - this.field = field; - } - public String getFieldName() { - return field.getFieldName(); - } - public ObjectInspector getFieldObjectInspector() { - return field.getFieldObjectInspector(); - } - public int getFieldID() { - return field.getFieldID(); - } - public String getFieldComment() { - return field.getFieldComment(); - } - } - - @Override - public List getAllStructFieldRefs() { - if (fields != null || delegate.getAllStructFieldRefs() == null) { - return fields; - } - List fields = delegate.getAllStructFieldRefs(); - List delegate = new ArrayList(fields.size()); - for (StructField field : fields) { - delegate.add(new DelegatedStructField(field)); - } - return this.fields = delegate; - } - - @Override - public StructField getStructFieldRef(String fieldName) { - StructField field = delegate.getStructFieldRef(fieldName); - return field == null ? null : new DelegatedStructField(field); - } - - @Override - public Object getStructFieldData(Object data, StructField fieldRef) { - return delegate.getStructFieldData(data, ((DelegatedStructField) fieldRef).field); - } - - @Override - public List getStructFieldsDataAsList(Object data) { - return delegate.getStructFieldsDataAsList(data); - } - - public String getTypeName() { - return delegate.getTypeName(); - } - - public Category getCategory() { - return delegate.getCategory(); - } -} diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedUnionObjectInspector.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedUnionObjectInspector.java deleted file mode 100644 index 521fdd6..0000000 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedUnionObjectInspector.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.serde2.objectinspector; - -import java.util.ArrayList; -import java.util.List; - -public class DelegatedUnionObjectInspector implements UnionObjectInspector { - - private UnionObjectInspector delegate; - private List children; - - protected DelegatedUnionObjectInspector() { - super(); - } - public DelegatedUnionObjectInspector(UnionObjectInspector delegate) { - this.delegate = delegate; - } - - public void reset(UnionObjectInspector current) { - this.delegate = current; - if (children != null) { - int index = 0; - List newOIs = delegate.getObjectInspectors(); - for (ObjectInspector child : children) { - DelegatedObjectInspectorFactory.reset(child, newOIs.get(index++)); - } - } - } - - public List getObjectInspectors() { - if (children != null || delegate.getObjectInspectors() == null) { - return children; - } - List inspectors = delegate.getObjectInspectors(); - List delegated = new ArrayList(); - for (ObjectInspector inspector : inspectors) { - delegated.add(DelegatedObjectInspectorFactory.wrap(inspector)); - } - return children = delegated; - } - - public byte getTag(Object o) { - return delegate.getTag(o); - } - - public Object getField(Object o) { - return delegate.getField(o); - } - - public String getTypeName() { - return delegate.getTypeName(); - } - - public Category getCategory() { - return delegate.getCategory(); - } -} diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java index 8a42577..2a47d97 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java @@ -209,7 +209,7 @@ public static ObjectInspector getConvertedOI( * can contain non-settable fields only if inputOI equals outputOI and equalsCheck is * true. */ - private static ObjectInspector getConvertedOI( + public static ObjectInspector getConvertedOI( ObjectInspector inputOI, ObjectInspector outputOI, Map oiSettableProperties, diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java index 49884b8..cb996a8 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java @@ -478,6 +478,9 @@ public HiveVarcharConverter(PrimitiveObjectInspector inputOI, @Override public Object convert(Object input) { + if (input == null) { + return null; + } switch (inputOI.getPrimitiveCategory()) { case BOOLEAN: return outputOI.set(hc, @@ -504,6 +507,9 @@ public HiveCharConverter(PrimitiveObjectInspector inputOI, @Override public Object convert(Object input) { + if (input == null) { + return null; + } switch (inputOI.getPrimitiveCategory()) { case BOOLEAN: return outputOI.set(hc,