diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6a1a5f0..b869d72 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -598,8 +598,6 @@ new TimeValidator(TimeUnit.SECONDS), "How long to run autoprogressor for the script/UDTF operators.\n" + "Set to 0 for forever."), - HIVETABLENAME("hive.table.name", "", ""), - HIVEPARTITIONNAME("hive.partition.name", "", ""), HIVESCRIPTAUTOPROGRESS("hive.script.auto.progress", false, "Whether Hive Transform/Map/Reduce Clause should automatically send progress information to TaskTracker \n" + "to avoid the task getting killed because of inactivity. Hive sends progress information when the script is \n" + diff --git data/conf/hive-log4j.properties data/conf/hive-log4j.properties index 7f5dfc4..f90b833 100644 --- data/conf/hive-log4j.properties +++ data/conf/hive-log4j.properties @@ -75,6 +75,11 @@ log4j.category.JPOX.Query=ERROR,DRFA log4j.category.JPOX.General=ERROR,DRFA log4j.category.JPOX.Enhancer=ERROR,DRFA log4j.logger.org.apache.hadoop.conf.Configuration=ERROR,DRFA +log4j.logger.org.apache.zookeeper=INFO,DRFA +log4j.logger.org.apache.zookeeper.server.ServerCnxn=WARN,DRFA log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA +log4j.logger.org.apache.zookeeper.ClientCnxn=WARN,DRFA +log4j.logger.org.apache.zookeeper.ClientCnxnSocket=WARN,DRFA log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA - +log4j.logger.org.apache.hadoop.hive.ql.log.PerfLogger=WARN,DRFA +log4j.logger.org.apache.hadoop.hive.ql.exec.Operator=INFO,DRFA diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index f624bf4..38ae020 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -23,20 +23,20 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -59,6 +59,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; /** @@ -80,55 +81,23 @@ private final transient LongWritable deserialize_error_count = new LongWritable(); - private final Map opCtxMap = new HashMap(); - private final Map, MapOpCtx> childrenOpToOpCtxMap = - new HashMap, MapOpCtx>(); - - protected transient MapOpCtx current; - private transient List> extraChildrenToClose = null; - private final Map normalizedPaths = new HashMap(); - - private static class MapInputPath { - String path; - String alias; - Operator op; - PartitionDesc partDesc; - - /** - * @param path - * @param alias - * @param op - */ - public MapInputPath(String path, String alias, Operator op, PartitionDesc partDesc) { - this.path = path; - this.alias = alias; - this.op = op; - this.partDesc = partDesc; - } + // input path --> {operator --> context} + private final Map, MapOpCtx>> opCtxMap = + new HashMap, MapOpCtx>>(); + // child operator --> object inspector (converted OI if it's needed) + private final Map, StructObjectInspector> childrenOpToOI = + new HashMap, StructObjectInspector>(); - @Override - public boolean equals(Object o) { - if (o instanceof MapInputPath) { - MapInputPath mObj = (MapInputPath) o; - return path.equals(mObj.path) && alias.equals(mObj.alias) - && op.equals(mObj.op); - } - - return false; - } - - @Override - public int hashCode() { - int ret = (path == null) ? 0 : path.hashCode(); - ret += (alias == null) ? 0 : alias.hashCode(); - ret += (op == null) ? 0 : op.hashCode(); - return ret; - } - } + // context for current input file + protected transient Map, MapOpCtx> currentCtxs; + private transient final Map normalizedPaths = new HashMap(); protected static class MapOpCtx { - StructObjectInspector tblRawRowObjectInspector; // columns + final String alias; + final Operator op; + final PartitionDesc partDesc; + StructObjectInspector partObjectInspector; // partition columns StructObjectInspector vcsObjectInspector; // virtual columns StructObjectInspector rowObjectInspector; @@ -144,6 +113,12 @@ public int hashCode() { List vcs; Object[] vcValues; + public MapOpCtx(String alias, Operator op, PartitionDesc partDesc) { + this.alias = alias; + this.op = op; + this.partDesc = partDesc; + } + private boolean isPartitioned() { return partObjectInspector != null; } @@ -152,12 +127,30 @@ private boolean hasVC() { return vcsObjectInspector != null; } - private Object readRow(Writable value) throws SerDeException { - return partTblObjectInspectorConverter.convert(deserializer.deserialize(value)); + private Object readRow(Writable value, ExecMapperContext context) throws SerDeException { + Object deserialized = deserializer.deserialize(value); + Object row = partTblObjectInspectorConverter.convert(deserialized); + if (hasVC()) { + rowWithPartAndVC[0] = row; + if (context != null) { + populateVirtualColumnValues(context, vcs, vcValues, deserializer); + } + int vcPos = isPartitioned() ? 2 : 1; + rowWithPartAndVC[vcPos] = vcValues; + return rowWithPartAndVC; + } else if (isPartitioned()) { + rowWithPart[0] = row; + return rowWithPart; + } + return row; } - public StructObjectInspector getRowObjectInspector() { - return rowObjectInspector; + public boolean forward(Object row) throws HiveException { + if (op.getDone()) { + return false; + } + op.processOp(row, 0); + return true; } } @@ -170,20 +163,20 @@ public StructObjectInspector getRowObjectInspector() { * @param mapWork * @throws HiveException */ - public void initializeAsRoot(Configuration hconf, MapWork mapWork) - throws HiveException { + @VisibleForTesting + void initializeAsRoot(JobConf hconf, MapWork mapWork) throws Exception { setConf(mapWork); setChildren(hconf); + setExecContext(new ExecMapperContext(hconf)); initialize(hconf, null); } - private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, - Map convertedOI) throws Exception { + private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx, + StructObjectInspector tableRowOI) throws Exception { - PartitionDesc pd = ctx.partDesc; + PartitionDesc pd = opCtx.partDesc; TableDesc td = pd.getTableDesc(); - MapOpCtx opCtx = new MapOpCtx(); // Use table properties in case of unpartitioned tables, // and the union of table properties and partition properties, with partition // taking precedence, in the case of partitioned tables @@ -194,18 +187,13 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, opCtx.tableName = String.valueOf(overlayedProps.getProperty("name")); opCtx.partName = String.valueOf(partSpec); - - Class serdeclass = hconf.getClassByName(pd.getSerdeClassName()); - opCtx.deserializer = (Deserializer) serdeclass.newInstance(); - SerDeUtils.initializeSerDe(opCtx.deserializer, hconf, td.getProperties(), pd.getProperties()); + opCtx.deserializer = pd.getDeserializer(hconf); StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) opCtx.deserializer.getObjectInspector(); - opCtx.tblRawRowObjectInspector = convertedOI.get(td); - - opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( - partRawRowObjectInspector, opCtx.tblRawRowObjectInspector); + opCtx.partTblObjectInspectorConverter = + ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI); // Next check if this table has partitions and if so // get the list of partition names as well as allocate @@ -253,8 +241,8 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, // The op may not be a TableScan for mapjoins // Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key; // In that case, it will be a Select, but the rowOI need not be amended - if (ctx.op instanceof TableScanOperator) { - TableScanOperator tsOp = (TableScanOperator) ctx.op; + if (opCtx.op instanceof TableScanOperator) { + TableScanOperator tsOp = (TableScanOperator) opCtx.op; TableScanDesc tsDesc = tsOp.getConf(); if (tsDesc != null && tsDesc.hasVirtualCols()) { opCtx.vcs = tsDesc.getVirtualCols(); @@ -268,11 +256,11 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, } } if (!opCtx.hasVC() && !opCtx.isPartitioned()) { - opCtx.rowObjectInspector = opCtx.tblRawRowObjectInspector; + opCtx.rowObjectInspector = tableRowOI; return opCtx; } List inspectors = new ArrayList(); - inspectors.add(opCtx.tblRawRowObjectInspector); + inspectors.add(tableRowOI); if (opCtx.isPartitioned()) { inspectors.add(opCtx.partObjectInspector); } @@ -302,19 +290,14 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, for (String onefile : conf.getPathToAliases().keySet()) { PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile); TableDesc tableDesc = pd.getTableDesc(); - Properties tblProps = tableDesc.getProperties(); - Class sdclass = hconf.getClassByName(pd.getSerdeClassName()); - Deserializer partDeserializer = (Deserializer) sdclass.newInstance(); - SerDeUtils.initializeSerDe(partDeserializer, hconf, tblProps, pd.getProperties()); - StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer - .getObjectInspector(); + Deserializer partDeserializer = pd.getDeserializer(hconf); + StructObjectInspector partRawRowObjectInspector = + (StructObjectInspector) partDeserializer.getObjectInspector(); StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc); if ((tblRawRowObjectInspector == null) || (identityConverterTableDesc.contains(tableDesc))) { - sdclass = hconf.getClassByName(tableDesc.getSerdeClassName()); - Deserializer tblDeserializer = (Deserializer) sdclass.newInstance(); - SerDeUtils.initializeSerDe(tblDeserializer, hconf, tblProps, null); + Deserializer tblDeserializer = tableDesc.getDeserializer(hconf); tblRawRowObjectInspector = (StructObjectInspector) ObjectInspectorConverters.getConvertedOI( partRawRowObjectInspector, @@ -338,68 +321,83 @@ else if (partRawRowObjectInspector.equals(tblRawRowObjectInspector)) { return tableDescOI; } - public void setChildren(Configuration hconf) throws HiveException { - Path fpath = IOContext.get(hconf.get(Utilities.INPUT_NAME)).getInputPath(); - - boolean schemeless = fpath.toUri().getScheme() == null; + public void setChildren(Configuration hconf) throws Exception { List> children = new ArrayList>(); Map convertedOI = getConvertedOI(hconf); - try { - for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { - String onefile = entry.getKey(); - List aliases = entry.getValue(); + for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + String onefile = entry.getKey(); + List aliases = entry.getValue(); + PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); - Path onepath = new Path(onefile); - if (schemeless) { - onepath = new Path(onepath.toUri().getPath()); + for (String alias : aliases) { + Operator op = conf.getAliasToWork().get(alias); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding alias " + alias + " to work list for file " + + onefile); + } + Map, MapOpCtx> contexts = opCtxMap.get(onefile); + if (contexts == null) { + opCtxMap.put(onefile, contexts = new LinkedHashMap, MapOpCtx>()); } + if (contexts.containsKey(op)) { + continue; + } + MapOpCtx context = new MapOpCtx(alias, op, partDesc); + StructObjectInspector tableRowOI = convertedOI.get(partDesc.getTableDesc()); + contexts.put(op, initObjectInspector(hconf, context, tableRowOI)); - PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); + op.setParentOperators(new ArrayList>(1)); + op.getParentOperators().add(this); + children.add(op); + } + } - for (String onealias : aliases) { - Operator op = conf.getAliasToWork().get(onealias); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding alias " + onealias + " to work list for file " - + onefile); - } - MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); - if (opCtxMap.containsKey(inp)) { - continue; - } - MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI); - opCtxMap.put(inp, opCtx); - - op.setParentOperators(new ArrayList>()); - op.getParentOperators().add(this); - // check for the operators who will process rows coming to this Map - // Operator - if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { - children.add(op); - childrenOpToOpCtxMap.put(op, opCtx); - LOG.info("dump " + op + " " - + opCtxMap.get(inp).rowObjectInspector.getTypeName()); - } - current = opCtx; // just need for TestOperators.testMapOperator + initOperatorContext(children); + + // we found all the operators that we are supposed to process. + setChildOperators(children); + } + + private void initOperatorContext(List> children) + throws HiveException { + for (Map, MapOpCtx> contexts : opCtxMap.values()) { + for (MapOpCtx context : contexts.values()) { + if (!children.contains(context.op)) { + continue; + } + StructObjectInspector prev = + childrenOpToOI.put(context.op, context.rowObjectInspector); + if (prev != null && !prev.equals(context.rowObjectInspector)) { + throw new HiveException("Conflict on row inspector for " + context.alias); } + LOG.info("dump " + context.op + " " + context.rowObjectInspector.getTypeName()); } + } + } - if (children.size() == 0) { - // didn't find match for input file path in configuration! - // serious problem .. - LOG.error("Configuration does not have any alias for path: " - + fpath.toUri()); - throw new HiveException("Configuration and input path are inconsistent"); + private String getNominalPath(Path fpath) { + String nominal = null; + boolean schemaless = fpath.toUri().getScheme() == null; + for (String onefile : conf.getPathToAliases().keySet()) { + Path onepath = normalizePath(onefile, schemaless); + // check for the operators who will process rows coming to this Map Operator + if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { + // not from this + continue; } - - // we found all the operators that we are supposed to process. - setChildOperators(children); - } catch (Exception e) { - throw new HiveException(e); + if (nominal != null) { + throw new IllegalStateException("Ambiguous input path " + fpath); + } + nominal = onefile; + } + if (nominal == null) { + throw new IllegalStateException("Invalid input path " + fpath); } + return nominal; } @Override @@ -408,85 +406,47 @@ public void initializeOp(Configuration hconf) throws HiveException { state = State.INIT; statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count); - List> children = getChildOperators(); - - for (Entry, MapOpCtx> entry : childrenOpToOpCtxMap - .entrySet()) { - Operator child = entry.getKey(); - MapOpCtx mapOpCtx = entry.getValue(); - // Add alias, table name, and partitions to hadoop conf so that their - // children will inherit these - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName); - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName); - child.initialize(hconf, new ObjectInspector[] {mapOpCtx.rowObjectInspector}); - } - - for (Entry entry : opCtxMap.entrySet()) { - MapInputPath input = entry.getKey(); - MapOpCtx mapOpCtx = entry.getValue(); - // Add alias, table name, and partitions to hadoop conf so that their - // children will inherit these - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName); - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName); - - Operator op = input.op; - if (children.indexOf(op) == -1) { - // op is not in the children list, so need to remember it and close it afterwards - if (extraChildrenToClose == null) { - extraChildrenToClose = new ArrayList>(); - } - extraChildrenToClose.add(op); - op.initialize(hconf, new ObjectInspector[] {entry.getValue().rowObjectInspector}); - } - } - } - - /** - * close extra child operators that are initialized but are not executed. - */ - @Override - public void closeOp(boolean abort) throws HiveException { - if (extraChildrenToClose != null) { - for (Operator op : extraChildrenToClose) { - op.close(abort); - } + for (Entry, StructObjectInspector> entry : childrenOpToOI.entrySet()) { + Operator child = entry.getKey(); + child.initialize(hconf, new ObjectInspector[] {entry.getValue()}); } } // Find context for current input file @Override public void cleanUpInputFileChangedOp() throws HiveException { + super.cleanUpInputFileChangedOp(); Path fpath = getExecContext().getCurrentInputPath(); - - for (String onefile : conf.getPathToAliases().keySet()) { - Path onepath = normalizePath(onefile); - // check for the operators who will process rows coming to this Map - // Operator - if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { - // not from this - continue; - } - PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); - for (String onealias : conf.getPathToAliases().get(onefile)) { - Operator op = conf.getAliasToWork().get(onealias); - MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); - MapOpCtx context = opCtxMap.get(inp); - if (context != null) { - current = context; - LOG.info("Processing alias " + onealias + " for file " + onefile); - return; + String nominalPath = getNominalPath(fpath); + currentCtxs = opCtxMap.get(nominalPath); + if (isLogInfoEnabled) { + StringBuilder builder = new StringBuilder(); + for (MapOpCtx context : currentCtxs.values()) { + if (builder.length() > 0) { + builder.append(", "); } + builder.append(context.alias); } + LOG.info("Processing alias(es) " + builder.toString() + " for file " + fpath); + } + // Add alias, table name, and partitions to hadoop conf so that their + // children will inherit these + for (Entry, MapOpCtx> entry : currentCtxs.entrySet()) { + Operator operator = entry.getKey(); + MapOpCtx context = entry.getValue(); + operator.setInputContext(nominalPath, context.tableName, context.partName); } - throw new IllegalStateException("Invalid path " + fpath); } - private Path normalizePath(String onefile) { + private Path normalizePath(String onefile, boolean schemaless) { //creating Path is expensive, so cache the corresponding //Path object in normalizedPaths Path path = normalizedPaths.get(onefile); - if(path == null){ + if (path == null) { path = new Path(onefile); + if (schemaless && path.toUri().getScheme() != null) { + path = new Path(path.toUri().getPath()); + } normalizedPaths.put(onefile, path); } return path; @@ -500,50 +460,37 @@ public void process(Writable value) throws HiveException { // The child operators cleanup if input file has changed cleanUpInputFileChanged(); } - Object row; - try { - row = current.readRow(value); - if (current.hasVC()) { - current.rowWithPartAndVC[0] = row; - if (context != null) { - populateVirtualColumnValues(context, current.vcs, current.vcValues, current.deserializer); - } - int vcPos = current.isPartitioned() ? 2 : 1; - current.rowWithPartAndVC[vcPos] = current.vcValues; - row = current.rowWithPartAndVC; - } else if (current.isPartitioned()) { - current.rowWithPart[0] = row; - row = current.rowWithPart; - } - } catch (Exception e) { - // Serialize the row and output. - String rawRowString; + int childrenDone = 0; + for (MapOpCtx current : currentCtxs.values()) { + Object row = null; try { - rawRowString = value.toString(); - } catch (Exception e2) { - rawRowString = "[Error getting row data with exception " + - StringUtils.stringifyException(e2) + " ]"; + row = current.readRow(value, context); + if (!current.forward(row)) { + childrenDone++; + } + } catch (Exception e) { + // TODO: policy on deserialization errors + String message = toErrorMessage(value, row, current.rowObjectInspector); + if (row == null) { + deserialize_error_count.set(deserialize_error_count.get() + 1); + throw new HiveException("Hive Runtime Error while processing writable " + message, e); + } + throw new HiveException("Hive Runtime Error while processing row " + message, e); } - - // TODO: policy on deserialization errors - deserialize_error_count.set(deserialize_error_count.get() + 1); - throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e); } + if (childrenDone == currentCtxs.size()) { + setDone(true); + } + } - // The row has been converted to comply with table schema, irrespective of partition schema. - // So, use tblOI (and not partOI) for forwarding + private String toErrorMessage(Writable value, Object row, ObjectInspector inspector) { try { - forward(row, current.rowObjectInspector); - } catch (Exception e) { - // Serialize the row and output the error message. - String rowString; - try { - rowString = SerDeUtils.getJSONString(row, current.rowObjectInspector); - } catch (Exception e2) { - rowString = "[Error getting row data with exception " + - StringUtils.stringifyException(e2) + " ]"; + if (row != null) { + return SerDeUtils.getJSONString(row, inspector); } - throw new HiveException("Hive Runtime Error while processing row " + rowString, e); + return String.valueOf(value); + } catch (Exception e) { + return "[Error getting row data with exception " + StringUtils.stringifyException(e) + " ]"; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 3dc7c76..140156c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -213,8 +213,11 @@ public RowSchema getSchema() { protected transient HashMap, LongWritable> statsMap = new HashMap, LongWritable>(); @SuppressWarnings("rawtypes") protected transient OutputCollector out; - protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); - protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled(); + protected transient final Log LOG = LogFactory.getLog(getClass().getName()); + protected transient final Log PLOG = LogFactory.getLog(Operator.class.getName()); // for simple disabling logs from all operators + protected transient final boolean isLogInfoEnabled = LOG.isInfoEnabled() && PLOG.isInfoEnabled(); + protected transient final boolean isLogDebugEnabled = LOG.isDebugEnabled() && PLOG.isDebugEnabled(); + protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled(); protected transient String alias; protected transient Reporter reporter; protected transient String id; @@ -490,33 +493,45 @@ public ObjectInspector getOutputObjInspector() { public abstract void processOp(Object row, int tag) throws HiveException; protected final void defaultStartGroup() throws HiveException { - LOG.debug("Starting group"); + if (isLogDebugEnabled) { + LOG.debug("Starting group"); + } if (childOperators == null) { return; } - LOG.debug("Starting group for children:"); + if (isLogDebugEnabled) { + LOG.debug("Starting group for children:"); + } for (Operator op : childOperators) { op.startGroup(); } - LOG.debug("Start group Done"); + if (isLogDebugEnabled) { + LOG.debug("Start group Done"); + } } protected final void defaultEndGroup() throws HiveException { - LOG.debug("Ending group"); + if (isLogDebugEnabled) { + LOG.debug("Ending group"); + } if (childOperators == null) { return; } - LOG.debug("Ending group for children:"); + if (isLogDebugEnabled) { + LOG.debug("Ending group for children:"); + } for (Operator op : childOperators) { op.endGroup(); } - LOG.debug("End group Done"); + if (isLogDebugEnabled) { + LOG.debug("End group Done"); + } } // If a operator wants to do some work at the beginning of a group @@ -1046,6 +1061,17 @@ public void cleanUpInputFileChanged() throws HiveException { public void cleanUpInputFileChangedOp() throws HiveException { } + // called by map operator. propagated recursively to single parented descendants + public void setInputContext(String inputPath, String tableName, String partitionName) { + if (childOperators != null) { + for (Operator child : childOperators) { + if (child.getNumParent() == 1) { + child.setInputContext(inputPath, tableName, partitionName); + } + } + } + } + public boolean supportSkewJoinOptimization() { return false; } @@ -1263,7 +1289,7 @@ public OpTraits getOpTraits() { } public void setOpTraits(OpTraits metaInfo) { - if (LOG.isDebugEnabled()) { + if (isLogDebugEnabled) { LOG.debug("Setting traits ("+metaInfo+") on "+this); } if (conf != null) { @@ -1274,7 +1300,7 @@ public void setOpTraits(OpTraits metaInfo) { } public void setStatistics(Statistics stats) { - if (LOG.isDebugEnabled()) { + if (isLogDebugEnabled) { LOG.debug("Setting stats ("+stats+") on "+this); } if (conf != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index d8698da..4e38ded 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -65,10 +63,6 @@ PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex"); } - private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName()); - private static final boolean isInfoEnabled = LOG.isInfoEnabled(); - private static final boolean isDebugEnabled = LOG.isDebugEnabled(); - private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final long serialVersionUID = 1L; private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); @@ -149,7 +143,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { try { List keys = conf.getKeyCols(); - if (isDebugEnabled) { + if (isLogDebugEnabled) { LOG.debug("keys size is " + keys.size()); for (ExprNodeDesc k : keys) { LOG.debug("Key exprNodeDesc " + k.getExprString()); @@ -194,7 +188,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { tag = conf.getTag(); tagByte[0] = (byte) tag; skipTag = conf.getSkipTag(); - if (isInfoEnabled) { + if (isLogInfoEnabled) { LOG.info("Using tag = " + tag); } @@ -296,7 +290,7 @@ public void processOp(Object row, int tag) throws HiveException { bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector(); } - if (isInfoEnabled) { + if (isLogInfoEnabled) { LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys()); } @@ -342,7 +336,7 @@ public void processOp(Object row, int tag) throws HiveException { if (autoParallel && partitionEval.length > 0) { hashCode = computeMurmurHash(firstKey); } else { - hashCode = computeHashCode(row); + hashCode = computeHashCode(row, bucketNumber); } firstKey.setHashCode(hashCode); @@ -391,7 +385,7 @@ private int computeBucketNumber(Object row, int numBuckets) throws HiveException // column directly. Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField); buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField)); - if (isTraceEnabled) { + if (isLogTraceEnabled) { LOG.trace("Acid choosing bucket number " + buckNum); } } else { @@ -438,7 +432,7 @@ protected final int computeMurmurHash(HiveKey firstKey) { return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0); } - private int computeHashCode(Object row) throws HiveException { + private int computeHashCode(Object row, int buckNum) throws HiveException { // Evaluate the HashCode int keyHashCode = 0; if (partitionEval.length == 0) { @@ -462,10 +456,11 @@ private int computeHashCode(Object row) throws HiveException { + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); } } - if (isTraceEnabled) { - LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber)); + int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; + if (isLogTraceEnabled) { + LOG.trace("Going to return hash code " + hashCode); } - return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber; + return hashCode; } private boolean partitionKeysAreNull(Object row) throws HiveException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index 8228e09..be85a53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -283,6 +283,16 @@ void displayBrokenPipeInfo() { return; } + private transient String tableName; + private transient String partitionName ; + + @Override + public void setInputContext(String inputPath, String tableName, String partitionName) { + this.tableName = tableName; + this.partitionName = partitionName; + super.setInputContext(inputPath, tableName, partitionName); + } + @Override public void processOp(Object row, int tag) throws HiveException { // initialize the user's process only when you receive the first row @@ -306,10 +316,8 @@ public void processOp(Object row, int tag) throws HiveException { String[] wrappedCmdArgs = addWrapper(cmdArgs); LOG.info("Executing " + Arrays.asList(wrappedCmdArgs)); - LOG.info("tablename=" - + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname)); - LOG.info("partname=" - + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname)); + LOG.info("tablename=" + tableName); + LOG.info("partname=" + partitionName); LOG.info("alias=" + alias); ProcessBuilder pb = new ProcessBuilder(wrappedCmdArgs); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 155002a..7443f8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2042,15 +2042,21 @@ public static String getResourceFiles(Configuration conf, SessionState.ResourceT public static ClassLoader getSessionSpecifiedClassLoader() { SessionState state = SessionState.get(); if (state == null || state.getConf() == null) { - LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead"); + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead"); + } return JavaUtils.getClassLoader(); } ClassLoader sessionCL = state.getConf().getClassLoader(); - if (sessionCL != null){ - LOG.debug("Use session specified class loader"); + if (sessionCL != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Use session specified class loader"); + } return sessionCL; } - LOG.debug("Session specified class loader not found, use thread based class loader"); + if (LOG.isDebugEnabled()) { + LOG.debug("Session specified class loader not found, use thread based class loader"); + } return JavaUtils.getClassLoader(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 311f6d6..ac4c0f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -40,7 +40,15 @@ public void process(Writable value) throws HiveException { // The row has been converted to comply with table schema, irrespective of partition schema. // So, use tblOI (and not partOI) for forwarding try { - forward(value, current.getRowObjectInspector()); + int childrenDone = 0; + for (MapOpCtx current : currentCtxs.values()) { + if (!current.forward(value)) { + childrenDone++; + } + } + if (childrenDone == currentCtxs.size()) { + setDone(true); + } } catch (Exception e) { throw new HiveException("Hive Runtime Error while processing row ", e); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index 78d4d1f..62e3deb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -76,12 +77,16 @@ public TableDesc( return inputFileFormatClass; } + public Deserializer getDeserializer() throws Exception { + return getDeserializer(null); + } + /** * Return a deserializer object corresponding to the tableDesc. */ - public Deserializer getDeserializer() throws Exception { + public Deserializer getDeserializer(Configuration conf) throws Exception { Deserializer de = getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(de, null, properties, null); + SerDeUtils.initializeSerDe(de, conf, properties, null); return de; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index 90e4cad..e694156 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -18,10 +18,7 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.File; import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -32,7 +29,6 @@ import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; @@ -49,8 +45,6 @@ import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.ql.processors.CommandProcessor; -import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; @@ -60,12 +54,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.junit.Test; @@ -328,7 +319,7 @@ public void testMapOperator() throws Throwable { try { System.out.println("Testing Map Operator"); // initialize configuration - Configuration hconf = new JobConf(TestOperators.class); + JobConf hconf = new JobConf(TestOperators.class); HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME, "hdfs:///testDir/testFile"); IOContext.get(hconf.get(Utilities.INPUT_NAME)).setInputPath( diff --git ql/src/test/queries/clientpositive/join_vc.q ql/src/test/queries/clientpositive/join_vc.q index 63b3da7..8d7dea9 100644 --- ql/src/test/queries/clientpositive/join_vc.q +++ ql/src/test/queries/clientpositive/join_vc.q @@ -3,3 +3,10 @@ explain select t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value from src t1 join src t2 on t1.key = t2.key join src t3 on t2.value = t3.value order by t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value limit 3; select t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value from src t1 join src t2 on t1.key = t2.key join src t3 on t2.value = t3.value order by t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value limit 3; + +explain +select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100; + +select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100; diff --git ql/src/test/results/clientpositive/join_vc.q.out ql/src/test/results/clientpositive/join_vc.q.out index 12004ca..58e1450 100644 --- ql/src/test/results/clientpositive/join_vc.q.out +++ ql/src/test/results/clientpositive/join_vc.q.out @@ -137,3 +137,227 @@ POSTHOOK: Input: default@src 0 238 val_238 0 238 val_238 0 238 val_238 +PREHOOK: query: explain +select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key < 100)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + value expressions: BLOCK__OFFSET__INSIDE__FILE (type: bigint) + TableScan + alias: t1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key < 100)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {VALUE._col1} + outputColumnNames: _col7 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col7 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +2088 +2632 +968 +2088 +2632 +968 +2088 +2632 +968 +2846 +3170 +1720 +4362 +1720 +4362 +386 +2770 +386 +2770 +910 +5340 +5514 +5340 +5514 +2824 +4004 +1118 +4594 +1972 +4594 +1972 +2226 +5284 +2226 +5284 +34 +5616 +3494 +3592 +3192 +3138 +4012 +1238 +3138 +4012 +1238 +3138 +4012 +1238 +5626 +328 +5626 +328 +1218 +3388 +2030 +3298 +2030 +3298 +2330 +4068 +1198 +3060 +4540 +3864 +3060 +4540 +3864 +3060 +4540 +3864 +2308 +1462 +2308 +1462 +4186 +1440 +1024 +1906 +3128 +1906 +3128 +3516 +1592 +198 +1754 +5306 +1754 +5306 +3570 +3794 +4640 +4548 +3794 +4640 +4548 +3794 +4640 +4548 +2792 +1208 +2792 +1208 +3548 +3378 +3538 +3378 +3538 +2622 +3368 +1916 +4058 +396 +5070 +1674 +5070 +1674 +1872 +5606 +1872 +5606 +2612 +12 +2652 +5398 +2802 +5744 +4304 +2802 +5744 +4304 +2802 +5744 +4304 +1176 +3160 +2400 +3160 +2400 +2216 +5572 +5802 +5572 +5802 +92 +2458 +92 +2458