diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 05aab4a..60be4ef 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.io.DruidRecordWriter; +import org.apache.hadoop.hive.druid.security.KerberosHttpClient; import org.apache.hadoop.hive.druid.serde.DruidSerDe; import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; @@ -50,6 +51,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; import com.google.common.annotations.VisibleForTesting; @@ -528,6 +530,13 @@ public void configureTableJobProperties(TableDesc tableDesc, Map @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { + if (UserGroupInformation.isSecurityEnabled()) { + // AM can not do Kerberos Auth so will do the input split generation in the HS2 + LOG.debug("Setting {} to {} to enable split generation on HS2", HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(), + Boolean.FALSE.toString() + ); + jobConf.set(HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(), Boolean.FALSE.toString()); + } try { DruidStorageHandlerUtils.addDependencyJars(jobConf, DruidRecordWriter.class); } catch (IOException e) { @@ -607,11 +616,16 @@ private static HttpClient makeHttpClient(Lifecycle lifecycle) { numConnection, readTimeout.toStandardDuration().getMillis() ); - return HttpClientInit.createClient( + final HttpClient httpClient = HttpClientInit.createClient( HttpClientConfig.builder().withNumConnections(numConnection) .withReadTimeout(new Period(readTimeout).toStandardDuration()).build(), lifecycle ); + if (UserGroupInformation.isSecurityEnabled()) { + LOG.info("building Kerberos Http Client"); + return new KerberosHttpClient(httpClient); + } + return httpClient; } public static HttpClient getHttpClient() { diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/security/DruidKerberosUtil.java druid-handler/src/java/org/apache/hadoop/hive/druid/security/DruidKerberosUtil.java new file mode 100644 index 0000000..e837925 --- /dev/null +++ druid-handler/src/java/org/apache/hadoop/hive/druid/security/DruidKerberosUtil.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.security; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.ietf.jgss.GSSContext; +import org.ietf.jgss.GSSException; +import org.ietf.jgss.GSSManager; +import org.ietf.jgss.GSSName; +import org.ietf.jgss.Oid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.CookieStore; +import java.net.HttpCookie; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +public class DruidKerberosUtil +{ + protected static final Logger log = LoggerFactory.getLogger(DruidKerberosUtil.class); + + private static final Base64 base64codec = new Base64(0); + + // A fair reentrant lock + private static ReentrantLock kerberosLock = new ReentrantLock(true); + + /** + * This method always needs to be called within a doAs block so that the client's TGT credentials + * can be read from the Subject. + * + * @return Kerberos Challenge String + * + * @throws Exception + */ + + public static String kerberosChallenge(String server) throws AuthenticationException + { + kerberosLock.lock(); + try { + // This Oid for Kerberos GSS-API mechanism. + Oid mechOid = KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID"); + GSSManager manager = GSSManager.getInstance(); + // GSS name for server + GSSName serverName = manager.createName("HTTP@" + server, GSSName.NT_HOSTBASED_SERVICE); + // Create a GSSContext for authentication with the service. + // We're passing client credentials as null since we want them to be read from the Subject. + GSSContext gssContext = + manager.createContext(serverName.canonicalize(mechOid), mechOid, null, GSSContext.DEFAULT_LIFETIME); + gssContext.requestMutualAuth(true); + gssContext.requestCredDeleg(true); + // Establish context + byte[] inToken = new byte[0]; + byte[] outToken = gssContext.initSecContext(inToken, 0, inToken.length); + gssContext.dispose(); + // Base64 encoded and stringified token for server + log.info("Got valid challenge for host {}", serverName); + return new String(base64codec.encode(outToken), StandardCharsets.US_ASCII); + } + catch (GSSException | IllegalAccessException | NoSuchFieldException | ClassNotFoundException e) { + throw new AuthenticationException(e); + } + finally { + kerberosLock.unlock(); + } + } + + + + public static HttpCookie getAuthCookie(CookieStore cookieStore, URI uri) + { + if (cookieStore == null) { + return null; + } + boolean isSSL = uri.getScheme().equals("https"); + List cookies = cookieStore.getCookies(); + + for (HttpCookie c : cookies) { + // If this is a secured cookie and the current connection is non-secured, + // then, skip this cookie. We need to skip this cookie because, the cookie + // replay will not be transmitted to the server. + if (c.getSecure() && !isSSL) { + continue; + } + if (c.getName().equals(AuthenticatedURL.AUTH_COOKIE)) { + return c; + } + } + return null; + } + + public static void removeAuthCookie(CookieStore cookieStore, URI uri) + { + HttpCookie authCookie = getAuthCookie(cookieStore, uri); + if (authCookie != null) { + cookieStore.remove(uri, authCookie); + } + } + + + public static boolean needToSendCredentials(CookieStore cookieStore, URI uri){ + return getAuthCookie(cookieStore, uri) == null; + } + +} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/security/KerberosHttpClient.java druid-handler/src/java/org/apache/hadoop/hive/druid/security/KerberosHttpClient.java new file mode 100644 index 0000000..a0f3d72 --- /dev/null +++ druid-handler/src/java/org/apache/hadoop/hive/druid/security/KerberosHttpClient.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.security; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.metamx.http.client.AbstractHttpClient; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.HttpResponseHandler; +import io.druid.concurrent.Execs; +import org.apache.hadoop.security.UserGroupInformation; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.joda.time.Duration; +import org.slf4j.LoggerFactory; + +import java.net.CookieManager; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + +/** + * This is a slightly modified version of kerberos module borrowed from druid project + * Couple of reasons behind the copy/modification instead of mvn dependency. + * 1/Need to remove the authentication step since it not required + * 2/To avoid some un-needed transitive dependencies that can clash on the classpath like jetty-XX. + */ +public class KerberosHttpClient extends AbstractHttpClient +{ + protected static final org.slf4j.Logger log = LoggerFactory.getLogger(KerberosHttpClient.class); + private final HttpClient delegate; + private final CookieManager cookieManager; + private final Executor exec = Execs.singleThreaded("KerberosHttpClient-%s"); + + public KerberosHttpClient(HttpClient delegate) + { + this.delegate = delegate; + this.cookieManager = new CookieManager(); + } + + @Override + public ListenableFuture go( + Request request, HttpResponseHandler httpResponseHandler, Duration duration + ) + { + final SettableFuture retVal = SettableFuture.create(); + inner_go(request, httpResponseHandler, duration, retVal); + return retVal; + } + + + private void inner_go( + final Request request, + final HttpResponseHandler httpResponseHandler, + final Duration duration, + final SettableFuture future + ) + { + try { + final String host = request.getUrl().getHost(); + final URI uri = request.getUrl().toURI(); + + + Map> cookieMap = cookieManager.get(uri, Collections.>emptyMap()); + for (Map.Entry> entry : cookieMap.entrySet()) { + request.addHeaderValues(entry.getKey(), entry.getValue()); + } + final boolean should_retry_on_unauthorized_response; + + if (DruidKerberosUtil.needToSendCredentials(cookieManager.getCookieStore(), uri)) { + // No Cookies for requested URI, authenticate user and add authentication header + log.info( + "No Auth Cookie found for URI{}. Existing Cookies{} Authenticating... ", + uri, + cookieManager.getCookieStore().getCookies() + ); + // Assuming that a valid UGI with kerberos cred is created by HS2 or LLAP + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + log.debug("The user credential is {}", currentUser); + String challenge = currentUser.doAs(new PrivilegedExceptionAction() + { + @Override + public String run() throws Exception + { + return DruidKerberosUtil.kerberosChallenge(host); + } + }); + request.setHeader(HttpHeaders.Names.AUTHORIZATION, "Negotiate " + challenge); + should_retry_on_unauthorized_response = false; + } else { + should_retry_on_unauthorized_response = true; + } + + ListenableFuture> internalFuture = delegate.go( + request, + new RetryIfUnauthorizedResponseHandler(new ResponseCookieHandler( + request.getUrl().toURI(), + cookieManager, + httpResponseHandler + )), + duration + ); + + Futures.addCallback(internalFuture, new FutureCallback>() + { + @Override + public void onSuccess(RetryResponseHolder result) + { + if (should_retry_on_unauthorized_response && result.shouldRetry()) { + log.info("Preparing for Retry"); + // remove Auth cookie + DruidKerberosUtil.removeAuthCookie(cookieManager.getCookieStore(), uri); + // clear existing cookie + request.setHeader("Cookie", ""); + inner_go(request.copy(), httpResponseHandler, duration, future); + } else { + log.debug("Not retrying and returning future response"); + future.set(result.getObj()); + } + } + + @Override + public void onFailure(Throwable t) + { + future.setException(t); + } + }, exec); + } + catch (Throwable e) { + throw Throwables.propagate(e); + } + } + + +} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/security/ResponseCookieHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/security/ResponseCookieHandler.java new file mode 100644 index 0000000..b84c5e4 --- /dev/null +++ druid-handler/src/java/org/apache/hadoop/hive/druid/security/ResponseCookieHandler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.security; + +import com.google.common.base.Function; +import com.google.common.collect.Maps; +import com.metamx.http.client.response.ClientResponse; +import com.metamx.http.client.response.HttpResponseHandler; +import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.CookieManager; +import java.net.URI; +import java.util.List; + +public class ResponseCookieHandler implements HttpResponseHandler +{ + protected static final Logger log = LoggerFactory.getLogger(ResponseCookieHandler.class); + + private final URI uri; + private final CookieManager manager; + private final HttpResponseHandler delegate; + + public ResponseCookieHandler(URI uri, CookieManager manager, HttpResponseHandler delegate) + { + this.uri = uri; + this.manager = manager; + this.delegate = delegate; + } + + @Override + public ClientResponse handleResponse(HttpResponse httpResponse) + { + try { + final HttpHeaders headers = httpResponse.headers(); + manager.put(uri, Maps.asMap(headers.names(), new Function>() + { + @Override + public List apply(String input) + { + return headers.getAll(input); + } + })); + } + catch (IOException e) { + log.error("Error while processing Cookies from header", e); + } + finally { + return delegate.handleResponse(httpResponse); + } + } + + @Override + public ClientResponse handleChunk( + ClientResponse clientResponse, HttpChunk httpChunk + ) + { + return delegate.handleChunk(clientResponse, httpChunk); + } + + @Override + public ClientResponse done(ClientResponse clientResponse) + { + return delegate.done(clientResponse); + } + + @Override + public void exceptionCaught(ClientResponse clientResponse, Throwable throwable) + { + delegate.exceptionCaught(clientResponse, throwable); + } +} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/security/RetryIfUnauthorizedResponseHandler.java druid-handler/src/java/org/apache/hadoop/hive/druid/security/RetryIfUnauthorizedResponseHandler.java new file mode 100644 index 0000000..acdebd7 --- /dev/null +++ druid-handler/src/java/org/apache/hadoop/hive/druid/security/RetryIfUnauthorizedResponseHandler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.security; + +import com.metamx.http.client.response.ClientResponse; +import com.metamx.http.client.response.HttpResponseHandler; +import org.jboss.netty.handler.codec.http.HttpChunk; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RetryIfUnauthorizedResponseHandler + implements HttpResponseHandler, RetryResponseHolder> +{ + protected static final Logger log = LoggerFactory.getLogger(RetryIfUnauthorizedResponseHandler.class); + private final HttpResponseHandler httpResponseHandler; + + + public RetryIfUnauthorizedResponseHandler( + HttpResponseHandler httpResponseHandler + ) + { + this.httpResponseHandler = httpResponseHandler; + } + + @Override + public ClientResponse> handleResponse(HttpResponse httpResponse) + { + log.debug("UnauthorizedResponseHandler - Got response status {}", httpResponse.getStatus()); + if (httpResponse.getStatus().equals(HttpResponseStatus.UNAUTHORIZED)) { + // Drain the buffer + httpResponse.getContent().toString(); + return ClientResponse.unfinished(RetryResponseHolder.retry()); + } else { + return wrap(httpResponseHandler.handleResponse(httpResponse)); + } + } + + @Override + public ClientResponse> handleChunk( + ClientResponse> clientResponse, HttpChunk httpChunk + ) + { + if (clientResponse.getObj().shouldRetry()) { + httpChunk.getContent().toString(); + return clientResponse; + } else { + return wrap(httpResponseHandler.handleChunk(unwrap(clientResponse), httpChunk)); + } + } + + @Override + public ClientResponse> done(ClientResponse> clientResponse) + { + if (clientResponse.getObj().shouldRetry()) { + return ClientResponse.finished(RetryResponseHolder.retry()); + } else { + return wrap(httpResponseHandler.done(unwrap(clientResponse))); + } + } + + @Override + public void exceptionCaught(ClientResponse> clientResponse, Throwable throwable) + { + httpResponseHandler.exceptionCaught(unwrap(clientResponse), throwable); + } + + private ClientResponse> wrap(ClientResponse response) + { + if (response.isFinished()) { + return ClientResponse.finished(new RetryResponseHolder(false, response.getObj())); + } else { + return ClientResponse.unfinished(new RetryResponseHolder(false, response.getObj())); + } + } + + private ClientResponse unwrap(ClientResponse> response) + { + if (response.isFinished()) { + return ClientResponse.finished(response.getObj().getObj()); + } else { + return ClientResponse.unfinished(response.getObj().getObj()); + } + } + + +} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/security/RetryResponseHolder.java druid-handler/src/java/org/apache/hadoop/hive/druid/security/RetryResponseHolder.java new file mode 100644 index 0000000..2199e74 --- /dev/null +++ druid-handler/src/java/org/apache/hadoop/hive/druid/security/RetryResponseHolder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.security; + +public class RetryResponseHolder +{ + private final boolean shouldRetry; + private final T obj; + + public RetryResponseHolder(boolean shouldRetry, T obj) + { + this.shouldRetry = shouldRetry; + this.obj = obj; + } + + public static RetryResponseHolder retry() + { + return new RetryResponseHolder(true, null); + } + + public boolean shouldRetry() + { + return shouldRetry; + } + + public T getObj() + { + return obj; + } +} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index ee9dcb3..90938aa 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -67,8 +67,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; -import org.joda.time.Period; -import org.joda.time.format.ISODateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,11 +74,6 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.parsers.ParseException; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; import io.druid.query.Druids; import io.druid.query.Druids.SegmentMetadataQueryBuilder; @@ -88,8 +81,6 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; -import io.druid.query.dimension.ExtractionDimensionSpec; -import io.druid.query.extraction.TimeFormatExtractionFn; import io.druid.query.groupby.GroupByQuery; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; @@ -284,6 +275,8 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ // Retrieve results List resultsList; try { + // This will throw an exception in case of the response from druid is not an array + // this case occurs if for instance druid query execution returns an exception instead of array of results. resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response, new TypeReference>() { } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 64e0d9f..e7f2400 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -31,7 +31,6 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -44,6 +43,9 @@ import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; +import org.apache.tez.mapreduce.common.MRInputSplitDistributor; +import org.apache.tez.mapreduce.hadoop.InputSplitInfo; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -650,9 +652,17 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, conf.setBoolean(Utilities.VECTOR_MODE, mapWork.getVectorMode()); conf.setBoolean(Utilities.USE_VECTORIZED_INPUT_FILE_FORMAT, mapWork.getUseVectorizedInputFileFormat()); - dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir, - "split_" + mapWork.getName().replaceAll(" ", "_")), true); - numTasks = dataSource.getNumberOfShards(); + InputSplitInfo inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(conf, false, 0); + InputInitializerDescriptor descriptor = InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()); + InputDescriptor inputDescriptor = InputDescriptor.create(MRInputLegacy.class.getName()) + .setUserPayload(UserPayload + .create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder() + .setConfigurationBytes(TezUtils.createByteStringFromConf(conf)) + .setSplits(inputSplitInfo.getSplitsProto()).build().toByteString() + .asReadOnlyByteBuffer())); + + dataSource = DataSourceDescriptor.create(inputDescriptor, descriptor, null); + numTasks = inputSplitInfo.getNumTasks(); // set up the operator plan. (after generating splits - that changes configs) Utilities.setMapWork(conf, mapWork, mrScratchDir, false);