diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 6d63797..12e6ea0 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -59,6 +59,8 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -137,15 +139,25 @@ public LlapBaseInputFormat() {} LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder = new LlapRecordReaderTaskUmbilicalExternalResponder(); - // TODO: close this LlapTaskUmbilicalExternalClient llapClient = new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(), submitWorkInfo.getToken(), umbilicalResponder, llapToken); llapClient.init(job); llapClient.start(); + int attemptNum = 0; + // Use task attempt number from conf if provided + TaskAttemptID taskAttemptId = TaskAttemptID.forName(job.get(MRJobConfig.TASK_ATTEMPT_ID)); + if (taskAttemptId != null) { + attemptNum = taskAttemptId.getId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting attempt number to " + attemptNum + " from task attempt ID in conf: " + + job.get(MRJobConfig.TASK_ATTEMPT_ID)); + } + } + SubmitWorkRequestProto request = constructSubmitWorkRequestProto( - submitWorkInfo, llapSplit.getSplitNum(), llapClient.getAddress(), + submitWorkInfo, llapSplit.getSplitNum(), attemptNum, llapClient.getAddress(), submitWorkInfo.getToken(), llapSplit.getFragmentBytes(), llapSplit.getFragmentBytesSignature()); llapClient.submitWork(request, host, llapSubmitPort); @@ -275,7 +287,7 @@ private ServiceInstance selectServiceInstance(Set serviceInstan } private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo, - int taskNum, InetSocketAddress address, Token token, + int taskNum, int attemptNum, InetSocketAddress address, Token token, byte[] fragmentBytes, byte[] fragmentBytesSignature) throws IOException { ApplicationId appId = submitWorkInfo.getFakeAppId(); @@ -284,7 +296,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo su LOG.info("Setting user in submitWorkRequest to: " + user); ContainerId containerId = - ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); + ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, attemptNum), taskNum); // Credentials can change across DAGs. Ideally construct only once per DAG. Credentials credentials = new Credentials(); @@ -309,7 +321,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo su } builder.setWorkSpec(vertexBuilder.build()); builder.setFragmentNumber(taskNum); - builder.setAttemptNumber(0); // TODO: hmm + builder.setAttemptNumber(attemptNum); builder.setContainerIdString(containerId.toString()); builder.setAmHost(address.getHostName()); builder.setAmPort(address.getPort());