diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java index 866f7c072b6f43f6cc8d29871e8e28b412ad3361..470c4e59d000a7f25566a13cd5e51805af12171a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; + +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; @@ -32,8 +35,6 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.shims.CombineHiveKey; -import java.io.IOException; - /** * Fast file merge operator for ORC files. */ @@ -72,6 +73,14 @@ private void processKeyValuePairs(Object key, Object value) } else { k = (OrcFileKeyWrapper) key; } + + // skip incompatible file, files that are missing stripe statistics are set to incompatible + if (k.isIncompatFile()) { + LOG.warn("Incompatible ORC file merge! Stripe statistics is missing. " + k.getInputPath()); + incompatFileSet.add(k.getInputPath()); + return; + } + filePath = k.getInputPath().toUri().getPath(); fixTmpPath(k.getInputPath().getParent()); @@ -81,9 +90,9 @@ private void processKeyValuePairs(Object key, Object value) if (prevPath == null) { prevPath = k.getInputPath(); reader = OrcFile.createReader(fs, k.getInputPath()); - if (isLogInfoEnabled) { - LOG.info("ORC merge file input path: " + k.getInputPath()); - } + if (isLogInfoEnabled) { + LOG.info("ORC merge file input path: " + k.getInputPath()); + } } // store the orc configuration from the first file. All other files should @@ -102,9 +111,9 @@ private void processKeyValuePairs(Object key, Object value) .version(version) .rowIndexStride(rowIndexStride) .inspector(reader.getObjectInspector())); - if (isLogDebugEnabled) { - LOG.info("ORC merge file output path: " + outPath); - } + if (isLogDebugEnabled) { + LOG.info("ORC merge file output path: " + outPath); + } } if (!checkCompatibility(k)) { @@ -128,9 +137,10 @@ private void processKeyValuePairs(Object key, Object value) v.getStripeStatistics()); if (isLogInfoEnabled) { - LOG.info("Merged stripe from file " + k.getInputPath() + " [ offset : " - + v.getStripeInformation().getOffset() + " length: " - + v.getStripeInformation().getLength() + " ]"); + LOG.info("Merged stripe from file " + k.getInputPath() + " [ offset : " + + v.getStripeInformation().getOffset() + " length: " + + v.getStripeInformation().getLength() + " row: " + + v.getStripeStatistics().getColStats(0).getNumberOfValues() + " ]"); } // add user metadata to footer in case of any @@ -139,9 +149,12 @@ private void processKeyValuePairs(Object key, Object value) } } catch (Throwable e) { this.exception = true; - closeOp(true); + LOG.error("Closing operator..Exception: " + ExceptionUtils.getStackTrace(e)); throw new HiveException(e); } finally { + if (exception) { + closeOp(true); + } if (fdis != null) { try { fdis.close(); @@ -157,43 +170,28 @@ private void processKeyValuePairs(Object key, Object value) private boolean checkCompatibility(OrcFileKeyWrapper k) { // check compatibility with subsequent files if ((k.getTypes().get(0).getSubtypesCount() != columnCount)) { - if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Column counts does not match for " - + k.getInputPath()); - } + LOG.warn("Incompatible ORC file merge! Column counts mismatch for " + k.getInputPath()); return false; } if (!k.getCompression().equals(compression)) { - if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Compression codec does not match" + - " for " + k.getInputPath()); - } + LOG.warn("Incompatible ORC file merge! Compression codec mismatch for " + k.getInputPath()); return false; } if (k.getCompressBufferSize() != compressBuffSize) { - if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Compression buffer size does not" + - " match for " + k.getInputPath()); - } + LOG.warn("Incompatible ORC file merge! Compression buffer size mismatch for " + k.getInputPath()); return false; } if (!k.getVersion().equals(version)) { - if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Version does not match for " - + k.getInputPath()); - } + LOG.warn("Incompatible ORC file merge! Version mismatch for " + k.getInputPath()); return false; } if (k.getRowIndexStride() != rowIndexStride) { - if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Row index stride does not match" + - " for " + k.getInputPath()); - } + LOG.warn("Incompatible ORC file merge! Row index stride mismatch for " + k.getInputPath()); return false; } @@ -232,7 +230,7 @@ public void closeOp(boolean abort) throws HiveException { outWriter.close(); outWriter = null; - } catch (IOException e) { + } catch (Exception e) { throw new HiveException("Unable to close OrcFileMergeOperator", e); } super.closeOp(abort); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java index ffba3c680bcb5b4aa56d4fd8b5d863fb878d95eb..0920dcff093d051534d3506110061257dfb9fddf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java @@ -57,9 +57,15 @@ void updateBoolean(boolean value) { @Override void merge(ColumnStatisticsImpl other) { + if (other instanceof BooleanStatisticsImpl) { + BooleanStatisticsImpl bkt = (BooleanStatisticsImpl) other; + trueCount += bkt.trueCount; + } else { + if (isStatsExists() && trueCount != 0) { + throw new IllegalArgumentException("Incompatible merging of boolean column statistics"); + } + } super.merge(other); - BooleanStatisticsImpl bkt = (BooleanStatisticsImpl) other; - trueCount += bkt.trueCount; } @Override @@ -149,28 +155,35 @@ void updateInteger(long value) { @Override void merge(ColumnStatisticsImpl other) { - IntegerStatisticsImpl otherInt = (IntegerStatisticsImpl) other; - if (!hasMinimum) { - hasMinimum = otherInt.hasMinimum; - minimum = otherInt.minimum; - maximum = otherInt.maximum; - } else if (otherInt.hasMinimum) { - if (otherInt.minimum < minimum) { + if (other instanceof IntegerStatisticsImpl) { + IntegerStatisticsImpl otherInt = (IntegerStatisticsImpl) other; + if (!hasMinimum) { + hasMinimum = otherInt.hasMinimum; minimum = otherInt.minimum; - } - if (otherInt.maximum > maximum) { maximum = otherInt.maximum; + } else if (otherInt.hasMinimum) { + if (otherInt.minimum < minimum) { + minimum = otherInt.minimum; + } + if (otherInt.maximum > maximum) { + maximum = otherInt.maximum; + } } - } - super.merge(other); - overflow |= otherInt.overflow; - if (!overflow) { - boolean wasPositive = sum >= 0; - sum += otherInt.sum; - if ((otherInt.sum >= 0) == wasPositive) { - overflow = (sum >= 0) != wasPositive; + + overflow |= otherInt.overflow; + if (!overflow) { + boolean wasPositive = sum >= 0; + sum += otherInt.sum; + if ((otherInt.sum >= 0) == wasPositive) { + overflow = (sum >= 0) != wasPositive; + } + } + } else { + if (isStatsExists() && hasMinimum) { + throw new IllegalArgumentException("Incompatible merging of integer column statistics"); } } + super.merge(other); } @Override @@ -276,21 +289,27 @@ void updateDouble(double value) { @Override void merge(ColumnStatisticsImpl other) { - super.merge(other); - DoubleStatisticsImpl dbl = (DoubleStatisticsImpl) other; - if (!hasMinimum) { - hasMinimum = dbl.hasMinimum; - minimum = dbl.minimum; - maximum = dbl.maximum; - } else if (dbl.hasMinimum) { - if (dbl.minimum < minimum) { + if (other instanceof DoubleStatisticsImpl) { + DoubleStatisticsImpl dbl = (DoubleStatisticsImpl) other; + if (!hasMinimum) { + hasMinimum = dbl.hasMinimum; minimum = dbl.minimum; - } - if (dbl.maximum > maximum) { maximum = dbl.maximum; + } else if (dbl.hasMinimum) { + if (dbl.minimum < minimum) { + minimum = dbl.minimum; + } + if (dbl.maximum > maximum) { + maximum = dbl.maximum; + } + } + sum += dbl.sum; + } else { + if (isStatsExists() && hasMinimum) { + throw new IllegalArgumentException("Incompatible merging of double column statistics"); } } - sum += dbl.sum; + super.merge(other); } @Override @@ -382,25 +401,31 @@ void updateString(Text value) { @Override void merge(ColumnStatisticsImpl other) { - super.merge(other); - StringStatisticsImpl str = (StringStatisticsImpl) other; - if (minimum == null) { - if(str.minimum != null) { - maximum = new Text(str.getMaximum()); - minimum = new Text(str.getMinimum()); - } else { + if (other instanceof StringStatisticsImpl) { + StringStatisticsImpl str = (StringStatisticsImpl) other; + if (minimum == null) { + if (str.minimum != null) { + maximum = new Text(str.getMaximum()); + minimum = new Text(str.getMinimum()); + } else { /* both are empty */ - maximum = minimum = null; + maximum = minimum = null; + } + } else if (str.minimum != null) { + if (minimum.compareTo(str.minimum) > 0) { + minimum = new Text(str.getMinimum()); + } + if (maximum.compareTo(str.maximum) < 0) { + maximum = new Text(str.getMaximum()); + } } - } else if (str.minimum != null) { - if (minimum.compareTo(str.minimum) > 0) { - minimum = new Text(str.getMinimum()); - } - if (maximum.compareTo(str.maximum) < 0) { - maximum = new Text(str.getMaximum()); + sum += str.sum; + } else { + if (isStatsExists() && minimum != null) { + throw new IllegalArgumentException("Incompatible merging of string column statistics"); } } - sum += str.sum; + super.merge(other); } @Override @@ -476,9 +501,15 @@ void updateBinary(BytesWritable value) { @Override void merge(ColumnStatisticsImpl other) { + if (other instanceof BinaryColumnStatistics) { + BinaryStatisticsImpl bin = (BinaryStatisticsImpl) other; + sum += bin.sum; + } else { + if (isStatsExists() && sum != 0) { + throw new IllegalArgumentException("Incompatible merging of binary column statistics"); + } + } super.merge(other); - BinaryStatisticsImpl bin = (BinaryStatisticsImpl) other; - sum += bin.sum; } @Override @@ -556,25 +587,31 @@ void updateDecimal(HiveDecimal value) { @Override void merge(ColumnStatisticsImpl other) { - super.merge(other); - DecimalStatisticsImpl dec = (DecimalStatisticsImpl) other; - if (minimum == null) { - minimum = dec.minimum; - maximum = dec.maximum; - sum = dec.sum; - } else if (dec.minimum != null) { - if (minimum.compareTo(dec.minimum) > 0) { + if (other instanceof DecimalStatisticsImpl) { + DecimalStatisticsImpl dec = (DecimalStatisticsImpl) other; + if (minimum == null) { minimum = dec.minimum; - } - if (maximum.compareTo(dec.maximum) < 0) { maximum = dec.maximum; + sum = dec.sum; + } else if (dec.minimum != null) { + if (minimum.compareTo(dec.minimum) > 0) { + minimum = dec.minimum; + } + if (maximum.compareTo(dec.maximum) < 0) { + maximum = dec.maximum; + } + if (sum == null || dec.sum == null) { + sum = null; + } else { + sum = sum.add(dec.sum); + } } - if (sum == null || dec.sum == null) { - sum = null; - } else { - sum = sum.add(dec.sum); + } else { + if (isStatsExists() && minimum != null) { + throw new IllegalArgumentException("Incompatible merging of decimal column statistics"); } } + super.merge(other); } @Override @@ -582,7 +619,7 @@ void merge(ColumnStatisticsImpl other) { OrcProto.ColumnStatistics.Builder result = super.serialize(); OrcProto.DecimalStatistics.Builder dec = OrcProto.DecimalStatistics.newBuilder(); - if (getNumberOfValues() != 0) { + if (getNumberOfValues() != 0 && minimum != null) { dec.setMinimum(minimum.toString()); dec.setMaximum(maximum.toString()); } @@ -666,19 +703,25 @@ void updateDate(DateWritable value) { @Override void merge(ColumnStatisticsImpl other) { - super.merge(other); - DateStatisticsImpl dateStats = (DateStatisticsImpl) other; - if (minimum == null) { - minimum = dateStats.minimum; - maximum = dateStats.maximum; - } else if (dateStats.minimum != null) { - if (minimum > dateStats.minimum) { + if (other instanceof DateStatisticsImpl) { + DateStatisticsImpl dateStats = (DateStatisticsImpl) other; + if (minimum == null) { minimum = dateStats.minimum; - } - if (maximum < dateStats.maximum) { maximum = dateStats.maximum; + } else if (dateStats.minimum != null) { + if (minimum > dateStats.minimum) { + minimum = dateStats.minimum; + } + if (maximum < dateStats.maximum) { + maximum = dateStats.maximum; + } + } + } else { + if (isStatsExists() && minimum != null) { + throw new IllegalArgumentException("Incompatible merging of date column statistics"); } } + super.merge(other); } @Override @@ -686,7 +729,7 @@ void merge(ColumnStatisticsImpl other) { OrcProto.ColumnStatistics.Builder result = super.serialize(); OrcProto.DateStatistics.Builder dateStats = OrcProto.DateStatistics.newBuilder(); - if (getNumberOfValues() != 0) { + if (getNumberOfValues() != 0 && minimum != null) { dateStats.setMinimum(minimum); dateStats.setMaximum(maximum); } @@ -769,19 +812,25 @@ void updateTimestamp(Timestamp value) { @Override void merge(ColumnStatisticsImpl other) { - super.merge(other); - TimestampStatisticsImpl timestampStats = (TimestampStatisticsImpl) other; - if (minimum == null) { - minimum = timestampStats.minimum; - maximum = timestampStats.maximum; - } else if (timestampStats.minimum != null) { - if (minimum > timestampStats.minimum) { + if (other instanceof TimestampStatisticsImpl) { + TimestampStatisticsImpl timestampStats = (TimestampStatisticsImpl) other; + if (minimum == null) { minimum = timestampStats.minimum; - } - if (maximum < timestampStats.maximum) { maximum = timestampStats.maximum; + } else if (timestampStats.minimum != null) { + if (minimum > timestampStats.minimum) { + minimum = timestampStats.minimum; + } + if (maximum < timestampStats.maximum) { + maximum = timestampStats.maximum; + } + } + } else { + if (isStatsExists() && minimum != null) { + throw new IllegalArgumentException("Incompatible merging of timestamp column statistics"); } } + super.merge(other); } @Override @@ -789,7 +838,7 @@ void merge(ColumnStatisticsImpl other) { OrcProto.ColumnStatistics.Builder result = super.serialize(); OrcProto.TimestampStatistics.Builder timestampStats = OrcProto.TimestampStatistics .newBuilder(); - if (getNumberOfValues() != 0) { + if (getNumberOfValues() != 0 && minimum != null) { timestampStats.setMinimum(minimum); timestampStats.setMaximum(maximum); } @@ -878,6 +927,10 @@ void updateTimestamp(Timestamp value) { throw new UnsupportedOperationException("Can't update timestamp"); } + boolean isStatsExists() { + return !(count == 0 && hasNull == false); + } + void merge(ColumnStatisticsImpl stats) { count += stats.count; hasNull |= stats.hasNull; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java index 11f05c663c4f6a3e8a4db4133efd20f9df584b42..a62fc1e82a8c1fe079a72e732efa46fe8cbc967c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java @@ -18,26 +18,35 @@ package org.apache.hadoop.hive.ql.io.orc; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.WritableComparable; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableComparable; + /** * Key for OrcFileMergeMapper task. Contains orc file related information that * should match before merging two orc files. */ public class OrcFileKeyWrapper implements WritableComparable