diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index 4352ffe..64c1552 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -96,8 +96,8 @@ protected transient MapJoinPersistableTableContainer[] mapJoinTables; protected transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; - private final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; - private final MapJoinEagerRowContainer EMPTY_ROW_CONTAINER = new MapJoinEagerRowContainer(); + private final Object[] emptyObjectArray = new Object[0]; + private final MapJoinEagerRowContainer emptyRowContainer = new MapJoinEagerRowContainer(); private long rowNumber = 0; protected transient LogHelper console; @@ -118,7 +118,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); - EMPTY_ROW_CONTAINER.addRow(EMPTY_OBJECT_ARRAY); + emptyRowContainer.addRow(emptyObjectArray); // for small tables only; so get the big table position first posBigTableAlias = conf.getPosBigTable(); @@ -229,7 +229,7 @@ public void processOp(Object row, int tag) throws HiveException { MapJoinKeyObject key = new MapJoinKeyObject(); key.readFromRow(currentKey, joinKeysObjectInspectors[alias]); - Object[] value = EMPTY_OBJECT_ARRAY; + Object[] value = emptyObjectArray; if((hasFilter(alias) && filterMaps[alias].length > 0) || joinValues[alias].size() > 0) { value = JoinUtil.computeMapJoinValues(row, joinValues[alias], joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias], @@ -242,14 +242,14 @@ public void processOp(Object row, int tag) throws HiveException { rowContainer = new MapJoinEagerRowContainer(); rowContainer.addRow(value); } else { - rowContainer = EMPTY_ROW_CONTAINER; + rowContainer = emptyRowContainer; } rowNumber++; if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); } tableContainer.put(key, rowContainer); - } else if (rowContainer == EMPTY_ROW_CONTAINER) { + } else if (rowContainer == emptyRowContainer) { rowContainer = rowContainer.copy(); rowContainer.addRow(value); tableContainer.put(key, rowContainer); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java index fcbe887..2f137f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java @@ -26,13 +26,13 @@ public abstract class HivePairFlatMapFunction implements PairFlatMapFunction { - private static final NumberFormat TASK_ID_FORMAT = NumberFormat.getInstance(); - private static final NumberFormat STAGE_ID_FORMAT = NumberFormat.getInstance(); - static { - TASK_ID_FORMAT.setGroupingUsed(false); - TASK_ID_FORMAT.setMinimumIntegerDigits(6); - STAGE_ID_FORMAT.setGroupingUsed(false); - STAGE_ID_FORMAT.setMinimumIntegerDigits(4); + private final NumberFormat taskIdFormat = NumberFormat.getInstance(); + private final NumberFormat stageIdFormat = NumberFormat.getInstance(); + { + taskIdFormat.setGroupingUsed(false); + taskIdFormat.setMinimumIntegerDigits(6); + stageIdFormat.setGroupingUsed(false); + stageIdFormat.setMinimumIntegerDigits(4); } protected transient JobConf jobConf; @@ -60,7 +60,7 @@ private void setupMRLegacyConfigs() { StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_"); taskAttemptIdBuilder.append(System.currentTimeMillis()) .append("_") - .append(STAGE_ID_FORMAT.format(TaskContext.get().stageId())) + .append(stageIdFormat.format(TaskContext.get().stageId())) .append("_"); if (isMap()) { @@ -71,7 +71,7 @@ private void setupMRLegacyConfigs() { // Spark task attempt id is increased by Spark context instead of task, which may introduce // unstable qtest output, since non Hive features depends on this, we always set it to 0 here. - taskAttemptIdBuilder.append(TASK_ID_FORMAT.format(TaskContext.get().partitionId())) + taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId())) .append("_0"); String taskAttemptIdStr = taskAttemptIdBuilder.toString(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java index 8ec2a9e..286816b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java @@ -55,13 +55,13 @@ try { jobConf.write(new DataOutputStream(out)); } catch (IOException e) { - LOG.error("Error serializing job configuration", e); + LOG.error("Error serializing job configuration: " + e, e); return null; } finally { try { out.close(); } catch (IOException e) { - LOG.error("Error closing output stream", e); + LOG.error("Error closing output stream: " + e, e); } } @@ -74,8 +74,8 @@ public static JobConf deserializeJobConf(byte[] buffer) { try { conf.readFields(new DataInputStream(new ByteArrayInputStream(buffer))); } catch (IOException e) { - LOG.error("Error de-serializing job configuration"); - return null; + String msg = "Error de-serializing job configuration: " + e; + throw new IllegalStateException(msg, e); } return conf; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 4970828..819fce7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -39,7 +39,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.util.Iterator; @@ -57,9 +56,9 @@ * */ public class SparkMapRecordHandler extends SparkRecordHandler { + private static final Log LOG = LogFactory.getLog(SparkMapRecordHandler.class); private static final String PLAN_KEY = "__MAP_PLAN__"; private MapOperator mo; - public static final Log LOG = LogFactory.getLog(SparkMapRecordHandler.class); private MapredLocalWork localWork = null; private boolean isLogInfoEnabled = false; private ExecMapperContext execContext; @@ -125,7 +124,7 @@ // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - throw new RuntimeException("Map operator initialization failed", e); + throw new RuntimeException("Map operator initialization failed: " + e, e); } } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS); @@ -149,8 +148,9 @@ public void processRow(Object key, Object value) throws IOException { // Don't create a new object if we are already out of memory throw (OutOfMemoryError) e; } else { - LOG.fatal(StringUtils.stringifyException(e)); - throw new RuntimeException(e); + String msg = "Error processing row: " + e; + LOG.fatal(msg, e); + throw new RuntimeException(msg, e); } } } @@ -196,8 +196,9 @@ public void close() { } catch (Exception e) { if (!abort) { // signal new failure to map-reduce - LOG.error("Hit error while closing operators - failing tree"); - throw new IllegalStateException("Error while closing operators", e); + String msg = "Hit error while closing operators - failing tree: " + e; + LOG.error(msg, e); + throw new IllegalStateException(msg, e); } } finally { MapredContext.close(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 70f893a..52e6be8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -322,10 +322,6 @@ public static void processSkewJoin(JoinOperator joinOp, Task inputs, Set outputs) throws SemanticException { PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java index c38660b..2d9fb52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java @@ -42,7 +42,7 @@ public SparkEdgeProperty(long edgeType) { public boolean isShuffleNone() { return edgeType == SHUFFLE_NONE; } - + public void setShuffleNone() { edgeType = SHUFFLE_NONE; } @@ -80,7 +80,7 @@ public String getShuffleType() { if (isShuffleNone()) { return "NONE"; } - + StringBuilder sb = new StringBuilder(); if (isShuffleGroup()) { sb.append("GROUP"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java index 6f4cd53..bf7d027 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java @@ -53,7 +53,8 @@ public boolean publishStat(String fileID, Map stats) { try { reporter.incrCounter(fileID, entry.getKey(), Long.valueOf(entry.getValue())); } catch (Exception e) { - LOG.error("Failed to increment counter value " + entry.getValue() + " for " + entry.getKey()); + LOG.error("Failed to increment counter value " + entry.getValue() + " for " + entry.getKey() + + ": " + e, e); return false; } } diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 7b719d3..d287bfd 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -71,7 +71,6 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.hadoop.mapreduce.util.ResourceBundles; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.UserGroupInformation;