Index: itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java =================================================================== --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (revision 1619714) +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (working copy) @@ -43,7 +43,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Deque; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -64,10 +63,10 @@ import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hive.cli.CliDriver; import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.common.io.CachingPrintStream; import org.apache.hadoop.hive.common.io.DigestPrintStream; import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream; import org.apache.hadoop.hive.common.io.SortPrintStream; -import org.apache.hadoop.hive.common.io.CachingPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -75,8 +74,6 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.util.AllVectorTypesRecord; -import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; @@ -87,22 +84,14 @@ import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer; -import org.apache.hadoop.hive.serde2.thrift.test.Complex; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.Shell; import org.apache.hive.common.util.StreamPrinter; -import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.tools.ant.BuildException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; -import org.junit.Assume; import com.google.common.collect.ImmutableList; @@ -145,8 +134,8 @@ private QTestSetup setup = null; private boolean isSessionStateStarted = false; - private String initScript; - private String cleanupScript; + private final String initScript; + private final String cleanupScript; static { for (String srcTable : System.getProperty("test.src.tables", "").trim().split(",")) { @@ -332,14 +321,6 @@ HadoopShims shims = ShimLoader.getHadoopShims(); int numberOfDataNodes = 4; - // can run tez tests only on hadoop 2 - if (clusterType == MiniClusterType.tez) { - Assume.assumeTrue(ShimLoader.getMajorVersion().equals("0.23")); - // this is necessary temporarily - there's a probem with multi datanodes on MiniTezCluster - // will be fixed in 0.3 - numberOfDataNodes = 1; - } - if (clusterType != MiniClusterType.none) { dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null); FileSystem fs = dfs.getFileSystem(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (revision 1619714) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (working copy) @@ -346,7 +346,7 @@ /** * Utility method to create a stripped down configuration for the MR partitioner. - * + * * @param partitionerClassName * the real MR partitioner class name * @param baseConf @@ -427,7 +427,7 @@ // use tez to combine splits boolean groupSplitsInInputInitializer; - + DataSourceDescriptor dataSource; int numTasks = -1; @@ -462,11 +462,12 @@ } } - // set up the operator plan. Before setting up Inputs since the config is updated. - Utilities.setMapWork(conf, mapWork, mrScratchDir, false); - if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION) && !mapWork.isUseOneNullRowInputFormat()) { + + // set up the operator plan. (before setting up splits on the AM) + Utilities.setMapWork(conf, mapWork, mrScratchDir, false); + // if we're generating the splits in the AM, we just need to set // the correct plugin. if (groupSplitsInInputInitializer) { @@ -484,6 +485,9 @@ dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir, "split_" + mapWork.getName().replaceAll(" ", "_")), true); numTasks = dataSource.getNumberOfShards(); + + // set up the operator plan. (after generating splits - that changes configs) + Utilities.setMapWork(conf, mapWork, mrScratchDir, false); } UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (revision 1619714) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (working copy) @@ -64,6 +64,23 @@ protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; + public MapRecordProcessor(JobConf jconf) { + ObjectCache cache = ObjectCacheFactory.getCache(jconf); + execContext.setJc(jconf); + // create map and fetch operators + mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); + if (mapWork == null) { + mapWork = Utilities.getMapWork(jconf); + cache.cache(MAP_PLAN_KEY, mapWork); + l4j.info("Plan: "+mapWork); + for (String s: mapWork.getAliases()) { + l4j.info("Alias: "+s); + } + } else { + Utilities.setMapWork(jconf, mapWork); + } + } + @Override void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { @@ -87,22 +104,7 @@ ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); } - ObjectCache cache = ObjectCacheFactory.getCache(jconf); try { - - execContext.setJc(jconf); - // create map and fetch operators - mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); - if (mapWork == null) { - mapWork = Utilities.getMapWork(jconf); - cache.cache(MAP_PLAN_KEY, mapWork); - l4j.info("Plan: "+mapWork); - for (String s: mapWork.getAliases()) { - l4j.info("Alias: "+s); - } - } else { - Utilities.setMapWork(jconf, mapWork); - } if (mapWork.getVectorMode()) { mapOp = new VectorMapOperator(); } else { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (revision 1619714) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (working copy) @@ -130,7 +130,7 @@ LOG.info("Running task: " + getContext().getUniqueIdentifier()); if (isMap) { - rproc = new MapRecordProcessor(); + rproc = new MapRecordProcessor(jobConf); MRInputLegacy mrInput = getMRInput(inputs); try { mrInput.init(); @@ -201,6 +201,7 @@ this.writer = (KeyValueWriter) output.getWriter(); } + @Override public void collect(Object key, Object value) throws IOException { writer.write(key, value); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (revision 1619714) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (working copy) @@ -134,7 +134,7 @@ } List additionalLr = session.getLocalizedResources(); - + // log which resources we're adding (apart from the hive exec) if (LOG.isDebugEnabled()) { if (additionalLr == null || additionalLr.size() == 0) { @@ -165,7 +165,7 @@ counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); TezSessionPoolManager.getInstance().returnSession(session); - if (LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled() && counters != null) { for (CounterGroup group: counters) { LOG.info(group.getDisplayName() +":"); for (TezCounter counter: group) {