diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index e1a9a42..f606ec0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -90,7 +90,8 @@ private static Map connectOps = new TreeMap(); - public MapRecordProcessor(final JobConf jconf) throws Exception { + public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { + super(jconf, context); ObjectCache cache = ObjectCacheFactory.getCache(jconf); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); @@ -113,10 +114,10 @@ public Object call() { } @Override - void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, + void init(MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, processorContext, mrReporter, inputs, outputs); + super.init(mrReporter, inputs, outputs); MapredContext.init(true, new JobConf(jconf)); ((TezContext) MapredContext.get()).setInputs(inputs); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index 8a8fe55..b95ab42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -64,12 +64,16 @@ private final Object[] row = new Object[2]; ObjectCache cache; + public MergeFileRecordProcessor(final JobConf jconf, final ProcessorContext context) { + super(jconf, context); + } + @Override - void init(final JobConf jconf, ProcessorContext processorContext, + void init( MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, processorContext, mrReporter, inputs, outputs); + super.init(mrReporter, inputs, outputs); execContext = new ExecMapperContext(jconf); //Update JobConf using MRInput, info like filename comes via this diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java index e341d40..9b7b7ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java @@ -38,7 +38,7 @@ public MergeFileTezProcessor(ProcessorContext context) { @Override public void run(Map inputs, Map outputs) throws Exception { - rproc = new MergeFileRecordProcessor(); + rproc = new MergeFileRecordProcessor(jobConf, getContext()); initializeAndRunProcessor(inputs, outputs); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 15a2b5c..c563d9d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -50,11 +50,11 @@ */ public abstract class RecordProcessor { - protected JobConf jconf; + protected final JobConf jconf; protected Map inputs; protected Map outputs; protected Map outMap; - protected ProcessorContext processorContext; + protected final ProcessorContext processorContext; public static final Log l4j = LogFactory.getLog(RecordProcessor.class); @@ -67,22 +67,23 @@ protected PerfLogger perfLogger = PerfLogger.getPerfLogger(); protected String CLASS_NAME = RecordProcessor.class.getName(); + public RecordProcessor(JobConf jConf, ProcessorContext processorContext) { + this.jconf = jConf; + this.processorContext = processorContext; + } + /** * Common initialization code for RecordProcessors - * @param jconf - * @param processorContext the {@link ProcessorContext} * @param mrReporter * @param inputs map of Input names to {@link LogicalInput}s * @param outputs map of Output names to {@link LogicalOutput}s * @throws Exception */ - void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, + void init(MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { - this.jconf = jconf; this.reporter = mrReporter; this.inputs = inputs; this.outputs = outputs; - this.processorContext = processorContext; isLogInfoEnabled = l4j.isInfoEnabled(); isLogTraceEnabled = l4j.isTraceEnabled(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index cc8e6d7..b89937d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -83,7 +83,8 @@ private boolean abort; - public ReduceRecordProcessor(final JobConf jconf) throws Exception { + public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { + super(jconf, context); ObjectCache cache = ObjectCacheFactory.getCache(jconf); @@ -103,11 +104,11 @@ public Object call() { } @Override - void init(JobConf jconf, ProcessorContext processorContext, + void init( MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); - super.init(jconf, processorContext, mrReporter, inputs, outputs); + super.init(mrReporter, inputs, outputs); MapredContext.init(false, new JobConf(jconf)); List shuffleInputs = getShuffleInputs(inputs); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index f073deb..39f9db6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -129,9 +129,9 @@ public void run(Map inputs, Map out LOG.info("Running task: " + getContext().getUniqueIdentifier()); if (isMap) { - rproc = new MapRecordProcessor(jobConf); + rproc = new MapRecordProcessor(jobConf, getContext()); } else { - rproc = new ReduceRecordProcessor(jobConf); + rproc = new ReduceRecordProcessor(jobConf, getContext()); } initializeAndRunProcessor(inputs, outputs); @@ -144,7 +144,7 @@ protected void initializeAndRunProcessor(Map inputs, try { MRTaskReporter mrReporter = new MRTaskReporter(getContext()); - rproc.init(jobConf, getContext(), mrReporter, inputs, outputs); + rproc.init(mrReporter, inputs, outputs); rproc.run(); //done - output does not need to be committed as hive does not use outputcommitter