diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2185f85..7d670cf 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2232,6 +2232,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Whether to generate the splits locally or in the AM (tez only)"), HIVE_TEZ_GENERATE_CONSISTENT_SPLITS("hive.tez.input.generate.consistent.splits", true, "Whether to generate consistent split locations when generating splits in the AM"), + HIVE_TEZ_GENERATE_CUSTOM_SPLIT_LOCATIONS("hive.tez.input.generate.custom.split.locations", + false, + "Whether to generate custom locations for splits instead of using the ones provided by the raw splits"), HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"), HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez/Spark (Hadoop 2 only)"), diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java index 388b5f3..be811eb 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.registry; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +31,13 @@ public Map getAll(); /** + * Gets a list containing all the instances. This list has the same iteration order across + * different processes, assuming the list of registry entries is the same. + * @return + */ + public List getAllInstancesOrdered(); + + /** * Get an instance by worker identity. * * @param name diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 34e0682..660f978 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -17,8 +17,13 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; @@ -175,7 +180,8 @@ public String toString() { private final class FixedServiceInstanceSet implements ServiceInstanceSet { - private final Map instances = new HashMap(); + // LinkedHashMap have a repeatable iteration order. + private final Map instances = new LinkedHashMap<>(); public FixedServiceInstanceSet() { for (String host : hosts) { @@ -190,6 +196,19 @@ public FixedServiceInstanceSet() { } @Override + public List getAllInstancesOrdered() { + List list = new LinkedList<>(); + list.addAll(instances.values()); + Collections.sort(list, new Comparator() { + @Override + public int compare(ServiceInstance o1, ServiceInstance o2) { + return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); + } + }); + return list; + } + + @Override public ServiceInstance getInstance(String name) { return instances.get(name); } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index a8e1465..06cdde3 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -14,6 +14,8 @@ package org.apache.hadoop.hive.llap.registry.impl; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; @@ -30,11 +32,44 @@ private ServiceRegistry registry = null; private final boolean isDaemon; + private static final Map yarnRegistrires = new HashMap<>(); + public LlapRegistryService(boolean isDaemon) { super("LlapRegistryService"); this.isDaemon = isDaemon; } + /** + * Helper method to get a ServiceRegistry instance to read from the registry. + * This should not be used by LLAP daemons. + * + * @param conf {@link Configuration} instance which contains service registry information. + * @return + */ + public static synchronized LlapRegistryService getClient(Configuration conf) { + String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); + LlapRegistryService registry; + if (hosts.startsWith("@")) { + // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. + String name = hosts.substring(1); + if (yarnRegistrires.containsKey(name)) { + registry = yarnRegistrires.get(name); + } else { + registry = new LlapRegistryService(false); + registry.init(conf); + registry.start(); + yarnRegistrires.put(name, registry); + } + } else { + registry = new LlapRegistryService(false); + registry.init(conf); + registry.start(); + } + LOG.info("Using LLAP registry (client) type: " + registry); + return registry; + } + + @Override public void serviceInit(Configuration conf) { String hosts = conf.getTrimmed(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS); diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java index d474b6f..52c761c 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapYarnRegistryImpl.java @@ -20,15 +20,20 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; @@ -91,6 +96,8 @@ public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean isD LOG.info("Llap Registry is enabled with registryid: " + instanceName); this.conf = new Configuration(conf); + // TODO. yarn-site.xml is typically not available on a cluster node (AM / daemon). It would have to be shipped + // or have the provided Configuration contain yarn-details. conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); // registry reference client = (RegistryOperationsService) RegistryOperationsFactory.createInstance(conf); @@ -256,16 +263,47 @@ public String toString() { // LinkedHashMap to retain iteration order. private final Map instances = new LinkedHashMap<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); @Override - public synchronized Map getAll() { + public Map getAll() { // Return a copy. Instances may be modified during a refresh. - return new LinkedHashMap<>(instances); + readLock.lock(); + try { + return new LinkedHashMap<>(instances); + } finally { + readLock.unlock(); + } } @Override - public synchronized ServiceInstance getInstance(String name) { - return instances.get(name); + public List getAllInstancesOrdered() { + List list = new LinkedList<>(); + readLock.lock(); + try { + list.addAll(instances.values()); + } finally { + readLock.unlock(); + } + Collections.sort(list, new Comparator() { + @Override + public int compare(ServiceInstance o1, ServiceInstance o2) { + return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity()); + } + }); + return list; + } + + @Override + public ServiceInstance getInstance(String name) { + readLock.lock(); + try { + return instances.get(name); + } finally { + readLock.unlock(); + } } @Override @@ -277,7 +315,8 @@ public void refresh() throws IOException { Map records = RegistryUtils.listServiceRecords(client, RegistryPathUtils.parentOf(path)); // Synchronize after reading the service records from the external service (ZK) - synchronized (this) { + writeLock.lock(); + try { Set latestKeys = new HashSet(); LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this)); for (ServiceRecord rec : records.values()) { @@ -320,28 +359,34 @@ public void refresh() throws IOException { } else { this.instances.putAll(freshInstances); } + } finally { + writeLock.unlock(); } } @Override - public synchronized Set getByHost(String host) { + public Set getByHost(String host) { // TODO Maybe store this as a map which is populated during construction, to avoid walking // the map on each request. + readLock.lock(); Set byHost = new HashSet(); - - for (ServiceInstance i : instances.values()) { - if (host.equals(i.getHost())) { - // all hosts in instances should be alive in this impl - byHost.add(i); + try { + for (ServiceInstance i : instances.values()) { + if (host.equals(i.getHost())) { + // all hosts in instances should be alive in this impl + byHost.add(i); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Locality comparing " + host + " to " + i.getHost()); + } } if (LOG.isDebugEnabled()) { - LOG.debug("Locality comparing " + host + " to " + i.getHost()); + LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); } + return byHost; + } finally { + readLock.unlock(); } - if (LOG.isDebugEnabled()) { - LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host); - } - return byHost; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index fe1ef37..3b1f9ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -271,19 +271,20 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr HashMultimap. create(); boolean secondLevelGroupingDone = false; if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) { + // KKK Does the custom split provider need to be used here ? for (Integer key : bucketToInitialSplitMap.keySet()) { InputSplit[] inputSplitArray = (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap groupedSplit = grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots, inputName, mainWorkName.isEmpty()); + availableSlots, inputName, mainWorkName.isEmpty(), null); if (mainWorkName.isEmpty() == false) { Multimap singleBucketToGroupedSplit = HashMultimap. create(); singleBucketToGroupedSplit.putAll(key, groupedSplit.values()); groupedSplit = grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots, - HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES)); + HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES), null); secondLevelGroupingDone = true; } bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); @@ -298,7 +299,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0])); Multimap groupedSplit = grouper.generateGroupedSplits(jobConf, conf, inputSplitArray, waves, - availableSlots, inputName, false); + availableSlots, inputName, false, null); bucketToGroupedSplitMap.putAll(key, groupedSplit.values()); } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 8902acb..d1e3ae3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -24,15 +24,19 @@ import java.util.BitSet; import java.util.Comparator; import java.util.List; +import java.util.Map; import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.SerDeException; @@ -42,6 +46,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.common.TezUtils; @@ -73,6 +78,7 @@ public class HiveSplitGenerator extends InputInitializer { private static final Logger LOG = LoggerFactory.getLogger(HiveSplitGenerator.class); + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); private final DynamicPartitionPruner pruner; private final Configuration conf; @@ -80,6 +86,7 @@ private final MRInputUserPayloadProto userPayloadProto; private final MapWork work; private final SplitGrouper splitGrouper = new SplitGrouper(); + private final SplitLocationProvider splitLocationProvider; private static final String MIN_SPLIT_SIZE; @SuppressWarnings("unused") @@ -102,6 +109,29 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE this.jobConf = new JobConf(conf); + boolean useCustomLocations = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CUSTOM_SPLIT_LOCATIONS); + LOG.info("SplitGenerator using custom locations: " + useCustomLocations); + if (useCustomLocations) { + LlapRegistryService serviceRegistry; + serviceRegistry = LlapRegistryService.getClient(conf); + + List serviceInstances = serviceRegistry.getInstances().getAllInstancesOrdered(); + String[] locations = new String[serviceInstances.size()]; + int i = 0; + for (ServiceInstance serviceInstance : serviceInstances) { + if (isDebugEnabled) { + LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + + serviceInstance.getHost() + " to list for split locations"); + } + locations[i++] = serviceInstance.getHost(); + } + // Using a static set for now. THere's no guarantee that other nodes will see the same set. + splitLocationProvider = new HostAffinitySplitLocationProvider(locations); + } else { + this.splitLocationProvider = null; + } + // Read all credentials into the credentials instance stored in JobConf. ShimLoader.getHadoopShims().getMergedCredentials(jobConf); @@ -160,6 +190,7 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES, TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); + // Raw splits InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves)); // Sort the splits, so that subsequent grouping is consistent. Arrays.sort(splits, new InputSplitComparator()); @@ -170,11 +201,13 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE splits = pruneBuckets(work, splits); } + // TODO KKK This grouping could destroy the ordering of splits ? - which makes consistent splits useless. + // TODO KKK Verfiy this. Multimap groupedSplits = - splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots); + splitGrouper.generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, splitLocationProvider); // And finally return them in a flat array InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]); - LOG.info("Number of grouped splits: " + flatSplits.length); + LOG.info("Number of split groups (by schema): " + flatSplits.length); List locationHints = splitGrouper.createTaskLocationHints(flatSplits, generateConsistentSplits); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java new file mode 100644 index 0000000..d6ff53e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.split.SplitLocationProvider; +import org.apache.hive.common.util.Murmur3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This maps a split (path + offset) to an index based on the number of locations provided. + * + * If locations do not change across jobs, the intention is to map the same split to the same node. + * + * A big issue is when nodes change (added, removed, temporarily removed and re-added) etc. That changes + * the number of locations / position of locations - and will cause a huge cache miss. + * + * TODO: Support for consistent hashing when combining the split location generator and the ServiceRegistry. + * + */ +public class HostAffinitySplitLocationProvider implements SplitLocationProvider { + + private final Logger LOG = LoggerFactory.getLogger(HostAffinitySplitLocationProvider.class); + private final boolean isTraceEnabled = LOG.isTraceEnabled(); + + private final String[] knownLocations; + + public HostAffinitySplitLocationProvider(String[] knownLocations) { + + Preconditions.checkState(knownLocations != null && knownLocations.length != 0, + HostAffinitySplitLocationProvider.class.getName() + + "needs at least 1 location to function"); + this.knownLocations = knownLocations; + } + + @Override + public String[] getLocations(InputSplit split) throws IOException { + if (split instanceof FileSplit) { + FileSplit fsplit = (FileSplit) split; + long hash = generateHash(fsplit.getPath().toString(), fsplit.getStart()); + int indexRaw = (int) (hash % knownLocations.length); + int index = Math.abs(indexRaw); + if (isTraceEnabled) { + LOG.trace( + "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" + + fsplit.getLength() + " mapped to index=" + index + ", location=" + + knownLocations[index]); + } + return new String[]{knownLocations[index]}; + } else { + if (isTraceEnabled) { + LOG.trace("Split: " + split + " is not a FileSplit. Using default locations"); + } + return split.getLocations(); + } + } + + + private long generateHash(String path, long startOffset) throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + dob.writeLong(startOffset); + dob.writeUTF(path); + return Murmur3.hash64(dob.getData(), 0, dob.getLength()); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index aaaa6a5..006d6ff 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -42,6 +41,7 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.split.SplitLocationProvider; import org.apache.hadoop.mapred.split.TezGroupedSplit; import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper; import org.apache.tez.dag.api.TaskLocationHint; @@ -65,14 +65,13 @@ private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper(); - - /** * group splits for each bucket separately - while evenly filling all the * available slots with tasks */ public Multimap group(Configuration conf, - Multimap bucketSplitMultimap, int availableSlots, float waves) + Multimap bucketSplitMultimap, int availableSlots, float waves, + SplitLocationProvider splitLocationProvider) throws IOException { // figure out how many tasks we want for each bucket @@ -90,9 +89,9 @@ InputSplit[] rawSplits = inputSplitCollection.toArray(new InputSplit[0]); InputSplit[] groupedSplits = tezGrouper.getGroupedSplits(conf, rawSplits, bucketTaskMap.get(bucketId), - HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator()); + HiveInputFormat.class.getName(), new ColumnarSplitSizeEstimator(), splitLocationProvider); - LOG.info("Original split size is " + rawSplits.length + " grouped split size is " + LOG.info("Original split count is " + rawSplits.length + " grouped split count is " + groupedSplits.length + ", for bucket: " + bucketId); for (InputSplit inSplit : groupedSplits) { @@ -155,9 +154,10 @@ public Multimap generateGroupedSplits(JobConf jobConf, Configuration conf, InputSplit[] splits, - float waves, int availableSlots) + float waves, int availableSlots, + SplitLocationProvider locationProvider) throws Exception { - return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true); + return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true, locationProvider); } /** Generate groups of splits, separated by schema evolution boundaries */ @@ -166,7 +166,8 @@ InputSplit[] splits, float waves, int availableSlots, String inputName, - boolean groupAcrossFiles) throws + boolean groupAcrossFiles, + SplitLocationProvider locationProvider) throws Exception { MapWork work = populateMapWork(jobConf, inputName); @@ -188,7 +189,7 @@ // group them into the chunks we want Multimap groupedSplits = - this.group(jobConf, bucketSplitMultiMap, availableSlots, waves); + this.group(jobConf, bucketSplitMultiMap, availableSlots, waves, locationProvider); return groupedSplits; } @@ -207,6 +208,8 @@ // mapping of bucket id to number of required tasks to run Map bucketTaskMap = new HashMap(); + // TODO HIVE-12255. Make use of SplitSizeEstimator. + // The actual task computation needs to be looked at as well. // compute the total size per bucket long totalSize = 0; boolean earlyExit = false;