diff --git data/conf/hive-log4j.properties data/conf/hive-log4j.properties index 7f5dfc4..7336b0b 100644 --- data/conf/hive-log4j.properties +++ data/conf/hive-log4j.properties @@ -75,6 +75,12 @@ log4j.category.JPOX.Query=ERROR,DRFA log4j.category.JPOX.General=ERROR,DRFA log4j.category.JPOX.Enhancer=ERROR,DRFA log4j.logger.org.apache.hadoop.conf.Configuration=ERROR,DRFA +log4j.logger.org.apache.zookeeper=INFO,DRFA +log4j.logger.org.apache.zookeeper.server.ServerCnxn=WARN,DRFA log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA +log4j.logger.org.apache.zookeeper.ClientCnxn=WARN,DRFA +log4j.logger.org.apache.zookeeper.ClientCnxnSocket=WARN,DRFA log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA - +log4j.logger.org.apache.hadoop.hive.metastore.ObjectStore=WARN,DRFA +log4j.logger.org.apache.hadoop.hive.ql.log.PerfLogger=WARN,DRFA +log4j.logger.org.apache.hadoop.hive.ql.exec.Operator=INFO,DRFA diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index f624bf4..ec856b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -80,54 +79,31 @@ private final transient LongWritable deserialize_error_count = new LongWritable(); - private final Map opCtxMap = new HashMap(); - private final Map, MapOpCtx> childrenOpToOpCtxMap = - new HashMap, MapOpCtx>(); + private final Map> opCtxMap = new HashMap>(); + private final Map, MapOpMeta> childrenOpToOpMetaCtx = + new HashMap, MapOpMeta>(); - protected transient MapOpCtx current; - private transient List> extraChildrenToClose = null; + protected transient List currentCtxs; private final Map normalizedPaths = new HashMap(); - private static class MapInputPath { - String path; - String alias; - Operator op; - PartitionDesc partDesc; - - /** - * @param path - * @param alias - * @param op - */ - public MapInputPath(String path, String alias, Operator op, PartitionDesc partDesc) { - this.path = path; - this.alias = alias; - this.op = op; - this.partDesc = partDesc; - } - - @Override - public boolean equals(Object o) { - if (o instanceof MapInputPath) { - MapInputPath mObj = (MapInputPath) o; - return path.equals(mObj.path) && alias.equals(mObj.alias) - && op.equals(mObj.op); - } - - return false; - } + protected static class MapOpMeta { + private final String tableName; + private final String partName; + private final StructObjectInspector rowOI; - @Override - public int hashCode() { - int ret = (path == null) ? 0 : path.hashCode(); - ret += (alias == null) ? 0 : alias.hashCode(); - ret += (op == null) ? 0 : op.hashCode(); - return ret; + public MapOpMeta(String tableName, String partName, StructObjectInspector rowOI) { + this.tableName = tableName; + this.partName = partName; + this.rowOI = rowOI; } } protected static class MapOpCtx { + final String alias; + final Operator op; + final PartitionDesc partDesc; + StructObjectInspector tblRawRowObjectInspector; // columns StructObjectInspector partObjectInspector; // partition columns StructObjectInspector vcsObjectInspector; // virtual columns @@ -144,6 +120,12 @@ public int hashCode() { List vcs; Object[] vcValues; + public MapOpCtx(String alias, Operator op, PartitionDesc partDesc) { + this.alias = alias; + this.op = op; + this.partDesc = partDesc; + } + private boolean isPartitioned() { return partObjectInspector != null; } @@ -159,6 +141,34 @@ private Object readRow(Writable value) throws SerDeException { public StructObjectInspector getRowObjectInspector() { return rowObjectInspector; } + + public boolean forward(Object row) throws HiveException { + if (op.getDone()) { + return false; + } + op.processOp(row, 0); + return true; + } + + public boolean equals(Object o) { + if (o instanceof MapOpCtx) { + MapOpCtx mObj = (MapOpCtx) o; + return alias.equals(mObj.alias) && op.equals(mObj.op); + } + return false; + } + + @Override + public int hashCode() { + int ret = (alias == null) ? 0 : alias.hashCode(); + ret += (op == null) ? 0 : op.hashCode(); + return ret; + } + + @Override + public String toString() { + return alias + "[" + op + "]"; + } } /** @@ -171,19 +181,18 @@ public StructObjectInspector getRowObjectInspector() { * @throws HiveException */ public void initializeAsRoot(Configuration hconf, MapWork mapWork) - throws HiveException { + throws Exception { setConf(mapWork); setChildren(hconf); initialize(hconf, null); } - private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, + private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx ctx, Map convertedOI) throws Exception { PartitionDesc pd = ctx.partDesc; TableDesc td = pd.getTableDesc(); - MapOpCtx opCtx = new MapOpCtx(); // Use table properties in case of unpartitioned tables, // and the union of table properties and partition properties, with partition // taking precedence, in the case of partitioned tables @@ -192,20 +201,20 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, Map partSpec = pd.getPartSpec(); - opCtx.tableName = String.valueOf(overlayedProps.getProperty("name")); - opCtx.partName = String.valueOf(partSpec); + ctx.tableName = String.valueOf(overlayedProps.getProperty("name")); + ctx.partName = String.valueOf(partSpec); Class serdeclass = hconf.getClassByName(pd.getSerdeClassName()); - opCtx.deserializer = (Deserializer) serdeclass.newInstance(); - SerDeUtils.initializeSerDe(opCtx.deserializer, hconf, td.getProperties(), pd.getProperties()); + ctx.deserializer = (Deserializer) serdeclass.newInstance(); + SerDeUtils.initializeSerDe(ctx.deserializer, hconf, td.getProperties(), pd.getProperties()); StructObjectInspector partRawRowObjectInspector = - (StructObjectInspector) opCtx.deserializer.getObjectInspector(); + (StructObjectInspector) ctx.deserializer.getObjectInspector(); - opCtx.tblRawRowObjectInspector = convertedOI.get(td); + ctx.tblRawRowObjectInspector = convertedOI.get(td); - opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( - partRawRowObjectInspector, opCtx.tblRawRowObjectInspector); + ctx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( + partRawRowObjectInspector, ctx.tblRawRowObjectInspector); // Next check if this table has partitions and if so // get the list of partition names as well as allocate @@ -245,8 +254,8 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, } partObjectInspectors.add(oi); } - opCtx.rowWithPart = new Object[] {null, partValues}; - opCtx.partObjectInspector = ObjectInspectorFactory + ctx.rowWithPart = new Object[] {null, partValues}; + ctx.partObjectInspector = ObjectInspectorFactory .getStandardStructObjectInspector(partNames, partObjectInspectors); } @@ -257,30 +266,30 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx, TableScanOperator tsOp = (TableScanOperator) ctx.op; TableScanDesc tsDesc = tsOp.getConf(); if (tsDesc != null && tsDesc.hasVirtualCols()) { - opCtx.vcs = tsDesc.getVirtualCols(); - opCtx.vcValues = new Object[opCtx.vcs.size()]; - opCtx.vcsObjectInspector = VirtualColumn.getVCSObjectInspector(opCtx.vcs); - if (opCtx.isPartitioned()) { - opCtx.rowWithPartAndVC = Arrays.copyOfRange(opCtx.rowWithPart, 0, 3); + ctx.vcs = tsDesc.getVirtualCols(); + ctx.vcValues = new Object[ctx.vcs.size()]; + ctx.vcsObjectInspector = VirtualColumn.getVCSObjectInspector(ctx.vcs); + if (ctx.isPartitioned()) { + ctx.rowWithPartAndVC = Arrays.copyOfRange(ctx.rowWithPart, 0, 3); } else { - opCtx.rowWithPartAndVC = new Object[2]; + ctx.rowWithPartAndVC = new Object[2]; } } } - if (!opCtx.hasVC() && !opCtx.isPartitioned()) { - opCtx.rowObjectInspector = opCtx.tblRawRowObjectInspector; - return opCtx; + if (!ctx.hasVC() && !ctx.isPartitioned()) { + ctx.rowObjectInspector = ctx.tblRawRowObjectInspector; + return ctx; } List inspectors = new ArrayList(); - inspectors.add(opCtx.tblRawRowObjectInspector); - if (opCtx.isPartitioned()) { - inspectors.add(opCtx.partObjectInspector); + inspectors.add(ctx.tblRawRowObjectInspector); + if (ctx.isPartitioned()) { + inspectors.add(ctx.partObjectInspector); } - if (opCtx.hasVC()) { - inspectors.add(opCtx.vcsObjectInspector); + if (ctx.hasVC()) { + inspectors.add(ctx.vcsObjectInspector); } - opCtx.rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(inspectors); - return opCtx; + ctx.rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(inspectors); + return ctx; } // Return the mapping for table descriptor to the expected table OI @@ -338,68 +347,89 @@ else if (partRawRowObjectInspector.equals(tblRawRowObjectInspector)) { return tableDescOI; } - public void setChildren(Configuration hconf) throws HiveException { - Path fpath = IOContext.get(hconf.get(Utilities.INPUT_NAME)).getInputPath(); - - boolean schemeless = fpath.toUri().getScheme() == null; + public void setChildren(Configuration hconf) throws Exception { List> children = new ArrayList>(); Map convertedOI = getConvertedOI(hconf); - try { - for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { - String onefile = entry.getKey(); - List aliases = entry.getValue(); + for (Map.Entry> entry : conf.getPathToAliases().entrySet()) { + String onefile = entry.getKey(); + List aliases = entry.getValue(); + PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); - Path onepath = new Path(onefile); - if (schemeless) { - onepath = new Path(onepath.toUri().getPath()); + for (String alias : aliases) { + Operator op = conf.getAliasToWork().get(alias); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding alias " + alias + " to work list for file " + + onefile); + } + List contexts = opCtxMap.get(onefile); + if (contexts == null) { + opCtxMap.put(onefile, contexts = new ArrayList()); + } + MapOpCtx context = new MapOpCtx(alias, op, partDesc); + if (contexts.contains(context)) { + continue; } + contexts.add(initObjectInspector(hconf, context, convertedOI)); - PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); + op.setParentOperators(new ArrayList>(1)); + op.getParentOperators().add(this); + children.add(op); + } + } - for (String onealias : aliases) { - Operator op = conf.getAliasToWork().get(onealias); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding alias " + onealias + " to work list for file " - + onefile); - } - MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); - if (opCtxMap.containsKey(inp)) { - continue; - } - MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI); - opCtxMap.put(inp, opCtx); - - op.setParentOperators(new ArrayList>()); - op.getParentOperators().add(this); - // check for the operators who will process rows coming to this Map - // Operator - if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { - children.add(op); - childrenOpToOpCtxMap.put(op, opCtx); - LOG.info("dump " + op + " " - + opCtxMap.get(inp).rowObjectInspector.getTypeName()); + for (Operator child : conf.getAliasToWork().values()) { + MapOpMeta metaContext = getMetaContext(child); + if (metaContext != null) { + LOG.info("dump " + child + " " + metaContext.rowOI.getTypeName()); + childrenOpToOpMetaCtx.put(child, metaContext); + } + } + + // we found all the operators that we are supposed to process. + setChildOperators(children); + } + + private MapOpMeta getMetaContext(Operator child) throws HiveException { + MapOpMeta opMeta = null; + for (Map.Entry> contexts : opCtxMap.entrySet()) { + String path = contexts.getKey(); + for (MapOpCtx context : contexts.getValue()) { + if (context.op.equals(child)) { + if (opMeta == null ) { + opMeta = new MapOpMeta(context.tableName, context.partName, context.rowObjectInspector); + } else if (!opMeta.rowOI.equals(context.rowObjectInspector)) { + throw new HiveException("Conflict on row inspector for " + context); } - current = opCtx; // just need for TestOperators.testMapOperator } } + } + return opMeta; + } - if (children.size() == 0) { - // didn't find match for input file path in configuration! - // serious problem .. - LOG.error("Configuration does not have any alias for path: " - + fpath.toUri()); - throw new HiveException("Configuration and input path are inconsistent"); + private String getNominalPath(Path fpath) { + String nominal = null; + boolean schemeless = fpath.toUri().getScheme() == null; + for (String onefile : conf.getPathToAliases().keySet()) { + Path onepath = normalizePath(onefile, schemeless); + // check for the operators who will process rows coming to this Map + // Operator + if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { + // not from this + continue; } - - // we found all the operators that we are supposed to process. - setChildOperators(children); - } catch (Exception e) { - throw new HiveException(e); + if (nominal != null) { + throw new IllegalStateException("Ambiguous input path " + fpath); + } + nominal = onefile; + } + if (nominal == null) { + throw new IllegalStateException("Invalid input path " + fpath); } + return nominal; } @Override @@ -410,46 +440,15 @@ public void initializeOp(Configuration hconf) throws HiveException { List> children = getChildOperators(); - for (Entry, MapOpCtx> entry : childrenOpToOpCtxMap + for (Entry, MapOpMeta> entry : childrenOpToOpMetaCtx .entrySet()) { Operator child = entry.getKey(); - MapOpCtx mapOpCtx = entry.getValue(); - // Add alias, table name, and partitions to hadoop conf so that their - // children will inherit these - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName); - HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName); - child.initialize(hconf, new ObjectInspector[] {mapOpCtx.rowObjectInspector}); - } - - for (Entry entry : opCtxMap.entrySet()) { - MapInputPath input = entry.getKey(); - MapOpCtx mapOpCtx = entry.getValue(); + MapOpMeta mapOpCtx = entry.getValue(); // Add alias, table name, and partitions to hadoop conf so that their // children will inherit these HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName); HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName); - - Operator op = input.op; - if (children.indexOf(op) == -1) { - // op is not in the children list, so need to remember it and close it afterwards - if (extraChildrenToClose == null) { - extraChildrenToClose = new ArrayList>(); - } - extraChildrenToClose.add(op); - op.initialize(hconf, new ObjectInspector[] {entry.getValue().rowObjectInspector}); - } - } - } - - /** - * close extra child operators that are initialized but are not executed. - */ - @Override - public void closeOp(boolean abort) throws HiveException { - if (extraChildrenToClose != null) { - for (Operator op : extraChildrenToClose) { - op.close(abort); - } + child.initialize(hconf, new ObjectInspector[] {mapOpCtx.rowOI}); } } @@ -457,36 +456,26 @@ public void closeOp(boolean abort) throws HiveException { @Override public void cleanUpInputFileChangedOp() throws HiveException { Path fpath = getExecContext().getCurrentInputPath(); - - for (String onefile : conf.getPathToAliases().keySet()) { - Path onepath = normalizePath(onefile); - // check for the operators who will process rows coming to this Map - // Operator - if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { - // not from this - continue; - } - PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile); - for (String onealias : conf.getPathToAliases().get(onefile)) { - Operator op = conf.getAliasToWork().get(onealias); - MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc); - MapOpCtx context = opCtxMap.get(inp); - if (context != null) { - current = context; - LOG.info("Processing alias " + onealias + " for file " + onefile); - return; - } + currentCtxs = opCtxMap.get(getNominalPath(fpath)); + StringBuilder builder = new StringBuilder(); + for (MapOpCtx context : currentCtxs) { + if (builder.length() > 0) { + builder.append(", "); } + builder.append(context.alias); } - throw new IllegalStateException("Invalid path " + fpath); + LOG.info("Processing alias(es) " + builder.toString() + " for file " + fpath); } - private Path normalizePath(String onefile) { + private Path normalizePath(String onefile, boolean schemeless) { //creating Path is expensive, so cache the corresponding //Path object in normalizedPaths Path path = normalizedPaths.get(onefile); - if(path == null){ + if (path == null) { path = new Path(onefile); + if (schemeless && path.toUri().getScheme() != null) { + path = new Path(path.toUri().getPath()); + } normalizedPaths.put(onefile, path); } return path; @@ -500,50 +489,51 @@ public void process(Writable value) throws HiveException { // The child operators cleanup if input file has changed cleanUpInputFileChanged(); } - Object row; - try { - row = current.readRow(value); - if (current.hasVC()) { - current.rowWithPartAndVC[0] = row; - if (context != null) { - populateVirtualColumnValues(context, current.vcs, current.vcValues, current.deserializer); - } - int vcPos = current.isPartitioned() ? 2 : 1; - current.rowWithPartAndVC[vcPos] = current.vcValues; - row = current.rowWithPartAndVC; - } else if (current.isPartitioned()) { - current.rowWithPart[0] = row; - row = current.rowWithPart; - } - } catch (Exception e) { - // Serialize the row and output. - String rawRowString; + int childrenDone = 0; + for (MapOpCtx current : currentCtxs) { + Object row = null; try { - rawRowString = value.toString(); - } catch (Exception e2) { - rawRowString = "[Error getting row data with exception " + - StringUtils.stringifyException(e2) + " ]"; + row = current.readRow(value); + if (current.hasVC()) { + current.rowWithPartAndVC[0] = row; + if (context != null) { + populateVirtualColumnValues(context, current.vcs, current.vcValues, current.deserializer); + } + int vcPos = current.isPartitioned() ? 2 : 1; + current.rowWithPartAndVC[vcPos] = current.vcValues; + row = current.rowWithPartAndVC; + } else if (current.isPartitioned()) { + current.rowWithPart[0] = row; + row = current.rowWithPart; + } + if (!current.forward(row)) { + childrenDone++; + } + } catch (Exception e) { + // TODO: policy on deserialization errors + if (row == null) { + String rawRowString; + try { + rawRowString = value.toString(); + } catch (Exception e2) { + rawRowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + deserialize_error_count.set(deserialize_error_count.get() + 1); + throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e); + } + String rowString; + try { + rowString = SerDeUtils.getJSONString(row, current.rowObjectInspector); + } catch (Exception e2) { + rowString = "[Error getting row data with exception " + + StringUtils.stringifyException(e2) + " ]"; + } + throw new HiveException("Hive Runtime Error while processing row " + rowString, e); } - - // TODO: policy on deserialization errors - deserialize_error_count.set(deserialize_error_count.get() + 1); - throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e); } - - // The row has been converted to comply with table schema, irrespective of partition schema. - // So, use tblOI (and not partOI) for forwarding - try { - forward(row, current.rowObjectInspector); - } catch (Exception e) { - // Serialize the row and output the error message. - String rowString; - try { - rowString = SerDeUtils.getJSONString(row, current.rowObjectInspector); - } catch (Exception e2) { - rowString = "[Error getting row data with exception " + - StringUtils.stringifyException(e2) + " ]"; - } - throw new HiveException("Hive Runtime Error while processing row " + rowString, e); + if (childrenDone == currentCtxs.size()) { + setDone(true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index a066f98..043bdd9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -213,8 +213,10 @@ public RowSchema getSchema() { protected transient HashMap, LongWritable> statsMap = new HashMap, LongWritable>(); @SuppressWarnings("rawtypes") protected transient OutputCollector out; - protected transient Log LOG = LogFactory.getLog(this.getClass().getName()); - protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled(); + protected transient Log LOG = LogFactory.getLog(getClass().getName()); + protected transient Log PLOG = LogFactory.getLog(Operator.class.getName()); + protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled() && PLOG.isInfoEnabled(); + protected transient boolean isLogDebugEnabled = LOG.isDebugEnabled() && PLOG.isDebugEnabled(); protected transient String alias; protected transient Reporter reporter; protected transient String id; @@ -490,35 +492,47 @@ public ObjectInspector getOutputObjInspector() { public abstract void processOp(Object row, int tag) throws HiveException; protected final void defaultStartGroup() throws HiveException { - LOG.debug("Starting group"); + if (isLogDebugEnabled) { + LOG.debug("Starting group"); + } if (childOperators == null) { return; } - LOG.debug("Starting group for children:"); + if (isLogDebugEnabled) { + LOG.debug("Starting group for children:"); + } for (Operator op : childOperators) { op.setGroupKeyObjectInspector(groupKeyOI); op.setGroupKeyObject(groupKeyObject); op.startGroup(); } - LOG.debug("Start group Done"); + if (isLogDebugEnabled) { + LOG.debug("Start group Done"); + } } protected final void defaultEndGroup() throws HiveException { - LOG.debug("Ending group"); + if (isLogDebugEnabled) { + LOG.debug("Ending group"); + } if (childOperators == null) { return; } - LOG.debug("Ending group for children:"); + if (isLogDebugEnabled) { + LOG.debug("Ending group for children:"); + } for (Operator op : childOperators) { op.endGroup(); } - LOG.debug("End group Done"); + if (isLogDebugEnabled) { + LOG.debug("End group Done"); + } } // If a operator wants to do some work at the beginning of a group @@ -1266,7 +1280,7 @@ public OpTraits getOpTraits() { } public void setOpTraits(OpTraits metaInfo) { - if (LOG.isDebugEnabled()) { + if (isLogDebugEnabled) { LOG.debug("Setting traits ("+metaInfo+") on "+this); } if (conf != null) { @@ -1277,7 +1291,7 @@ public void setOpTraits(OpTraits metaInfo) { } public void setStatistics(Statistics stats) { - if (LOG.isDebugEnabled()) { + if (isLogDebugEnabled) { LOG.debug("Setting stats ("+stats+") on "+this); } if (conf != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 9bbc4ec..8272309 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -66,7 +64,6 @@ PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex"); } - private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName()); private static final long serialVersionUID = 1L; private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); @@ -446,8 +443,11 @@ private int computeHashCode(Object row, int buckNum) throws HiveException { + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]); } } - LOG.debug("Going to return hash code " + (keyHashCode * 31 + buckNum)); - return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; + int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum; + if (isLogDebugEnabled) { + LOG.debug("Going to return hash code " + hashCode); + } + return hashCode; } private boolean partitionKeysAreNull(Object row) throws HiveException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 155002a..7443f8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2042,15 +2042,21 @@ public static String getResourceFiles(Configuration conf, SessionState.ResourceT public static ClassLoader getSessionSpecifiedClassLoader() { SessionState state = SessionState.get(); if (state == null || state.getConf() == null) { - LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead"); + if (LOG.isDebugEnabled()) { + LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead"); + } return JavaUtils.getClassLoader(); } ClassLoader sessionCL = state.getConf().getClassLoader(); - if (sessionCL != null){ - LOG.debug("Use session specified class loader"); + if (sessionCL != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Use session specified class loader"); + } return sessionCL; } - LOG.debug("Session specified class loader not found, use thread based class loader"); + if (LOG.isDebugEnabled()) { + LOG.debug("Session specified class loader not found, use thread based class loader"); + } return JavaUtils.getClassLoader(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 311f6d6..2f7450e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -40,7 +40,15 @@ public void process(Writable value) throws HiveException { // The row has been converted to comply with table schema, irrespective of partition schema. // So, use tblOI (and not partOI) for forwarding try { - forward(value, current.getRowObjectInspector()); + int childrenDone = 0; + for (MapOpCtx current : currentCtxs) { + if (!current.forward(value)) { + childrenDone++; + } + } + if (childrenDone == currentCtxs.size()) { + setDone(true); + } } catch (Exception e) { throw new HiveException("Hive Runtime Error while processing row ", e); } diff --git ql/src/test/queries/clientpositive/join_vc.q ql/src/test/queries/clientpositive/join_vc.q index 63b3da7..8d7dea9 100644 --- ql/src/test/queries/clientpositive/join_vc.q +++ ql/src/test/queries/clientpositive/join_vc.q @@ -3,3 +3,10 @@ explain select t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value from src t1 join src t2 on t1.key = t2.key join src t3 on t2.value = t3.value order by t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value limit 3; select t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value from src t1 join src t2 on t1.key = t2.key join src t3 on t2.value = t3.value order by t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value limit 3; + +explain +select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100; + +select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100; diff --git ql/src/test/results/clientpositive/join_vc.q.out ql/src/test/results/clientpositive/join_vc.q.out index 12004ca..58e1450 100644 --- ql/src/test/results/clientpositive/join_vc.q.out +++ ql/src/test/results/clientpositive/join_vc.q.out @@ -137,3 +137,227 @@ POSTHOOK: Input: default@src 0 238 val_238 0 238 val_238 0 238 val_238 +PREHOOK: query: explain +select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: t2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key < 100)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + value expressions: BLOCK__OFFSET__INSIDE__FILE (type: bigint) + TableScan + alias: t1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (key is not null and (key < 100)) (type: boolean) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {VALUE._col1} + outputColumnNames: _col7 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col7 (type: bigint) + outputColumnNames: _col0 + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select t2.BLOCK__OFFSET__INSIDE__FILE +from src t1 join src t2 on t1.key = t2.key where t1.key < 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +2088 +2632 +968 +2088 +2632 +968 +2088 +2632 +968 +2846 +3170 +1720 +4362 +1720 +4362 +386 +2770 +386 +2770 +910 +5340 +5514 +5340 +5514 +2824 +4004 +1118 +4594 +1972 +4594 +1972 +2226 +5284 +2226 +5284 +34 +5616 +3494 +3592 +3192 +3138 +4012 +1238 +3138 +4012 +1238 +3138 +4012 +1238 +5626 +328 +5626 +328 +1218 +3388 +2030 +3298 +2030 +3298 +2330 +4068 +1198 +3060 +4540 +3864 +3060 +4540 +3864 +3060 +4540 +3864 +2308 +1462 +2308 +1462 +4186 +1440 +1024 +1906 +3128 +1906 +3128 +3516 +1592 +198 +1754 +5306 +1754 +5306 +3570 +3794 +4640 +4548 +3794 +4640 +4548 +3794 +4640 +4548 +2792 +1208 +2792 +1208 +3548 +3378 +3538 +3378 +3538 +2622 +3368 +1916 +4058 +396 +5070 +1674 +5070 +1674 +1872 +5606 +1872 +5606 +2612 +12 +2652 +5398 +2802 +5744 +4304 +2802 +5744 +4304 +2802 +5744 +4304 +1176 +3160 +2400 +3160 +2400 +2216 +5572 +5802 +5572 +5802 +92 +2458 +92 +2458