diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java index 866f7c072b6f43f6cc8d29871e8e28b412ad3361..cb95447f34c3dd34aa161ad7095283408953f27a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; + +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; @@ -32,8 +35,6 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.shims.CombineHiveKey; -import java.io.IOException; - /** * Fast file merge operator for ORC files. */ @@ -81,9 +82,9 @@ private void processKeyValuePairs(Object key, Object value) if (prevPath == null) { prevPath = k.getInputPath(); reader = OrcFile.createReader(fs, k.getInputPath()); - if (isLogInfoEnabled) { - LOG.info("ORC merge file input path: " + k.getInputPath()); - } + if (isLogInfoEnabled) { + LOG.info("ORC merge file input path: " + k.getInputPath()); + } } // store the orc configuration from the first file. All other files should @@ -102,9 +103,9 @@ private void processKeyValuePairs(Object key, Object value) .version(version) .rowIndexStride(rowIndexStride) .inspector(reader.getObjectInspector())); - if (isLogDebugEnabled) { - LOG.info("ORC merge file output path: " + outPath); - } + if (isLogDebugEnabled) { + LOG.info("ORC merge file output path: " + outPath); + } } if (!checkCompatibility(k)) { @@ -128,9 +129,10 @@ private void processKeyValuePairs(Object key, Object value) v.getStripeStatistics()); if (isLogInfoEnabled) { - LOG.info("Merged stripe from file " + k.getInputPath() + " [ offset : " - + v.getStripeInformation().getOffset() + " length: " - + v.getStripeInformation().getLength() + " ]"); + LOG.info("Merged stripe from file " + k.getInputPath() + " [ offset : " + + v.getStripeInformation().getOffset() + " length: " + + v.getStripeInformation().getLength() + " row: " + + v.getStripeStatistics().getColStats(0).getNumberOfValues() + " ]"); } // add user metadata to footer in case of any @@ -139,9 +141,12 @@ private void processKeyValuePairs(Object key, Object value) } } catch (Throwable e) { this.exception = true; - closeOp(true); + LOG.error("Closing operator..Exception: " + ExceptionUtils.getStackTrace(e)); throw new HiveException(e); } finally { + if (exception) { + closeOp(true); + } if (fdis != null) { try { fdis.close(); @@ -158,24 +163,24 @@ private boolean checkCompatibility(OrcFileKeyWrapper k) { // check compatibility with subsequent files if ((k.getTypes().get(0).getSubtypesCount() != columnCount)) { if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Column counts does not match for " - + k.getInputPath()); + LOG.info("Incompatible ORC file merge! Column counts does not match for " + + k.getInputPath()); } return false; } if (!k.getCompression().equals(compression)) { if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Compression codec does not match" + - " for " + k.getInputPath()); + LOG.info("Incompatible ORC file merge! Compression codec does not match" + + " for " + k.getInputPath()); } return false; } if (k.getCompressBufferSize() != compressBuffSize) { if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Compression buffer size does not" + - " match for " + k.getInputPath()); + LOG.info("Incompatible ORC file merge! Compression buffer size does not" + + " match for " + k.getInputPath()); } return false; @@ -183,16 +188,16 @@ private boolean checkCompatibility(OrcFileKeyWrapper k) { if (!k.getVersion().equals(version)) { if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Version does not match for " - + k.getInputPath()); + LOG.info("Incompatible ORC file merge! Version does not match for " + + k.getInputPath()); } return false; } if (k.getRowIndexStride() != rowIndexStride) { if (isLogInfoEnabled) { - LOG.info("Incompatible ORC file merge! Row index stride does not match" + - " for " + k.getInputPath()); + LOG.info("Incompatible ORC file merge! Row index stride does not match" + + " for " + k.getInputPath()); } return false; } @@ -232,7 +237,7 @@ public void closeOp(boolean abort) throws HiveException { outWriter.close(); outWriter = null; - } catch (IOException e) { + } catch (Exception e) { throw new HiveException("Unable to close OrcFileMergeOperator", e); } super.closeOp(abort); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java index ffba3c680bcb5b4aa56d4fd8b5d863fb878d95eb..93f2565d7ebfea7c44b75a7359c4f330cc331521 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java @@ -58,8 +58,10 @@ void updateBoolean(boolean value) { @Override void merge(ColumnStatisticsImpl other) { super.merge(other); - BooleanStatisticsImpl bkt = (BooleanStatisticsImpl) other; - trueCount += bkt.trueCount; + if (other instanceof BooleanStatisticsImpl) { + BooleanStatisticsImpl bkt = (BooleanStatisticsImpl) other; + trueCount += bkt.trueCount; + } } @Override @@ -149,26 +151,29 @@ void updateInteger(long value) { @Override void merge(ColumnStatisticsImpl other) { - IntegerStatisticsImpl otherInt = (IntegerStatisticsImpl) other; - if (!hasMinimum) { - hasMinimum = otherInt.hasMinimum; - minimum = otherInt.minimum; - maximum = otherInt.maximum; - } else if (otherInt.hasMinimum) { - if (otherInt.minimum < minimum) { + super.merge(other); + if (other instanceof IntegerStatisticsImpl) { + IntegerStatisticsImpl otherInt = (IntegerStatisticsImpl) other; + if (!hasMinimum) { + hasMinimum = otherInt.hasMinimum; minimum = otherInt.minimum; - } - if (otherInt.maximum > maximum) { maximum = otherInt.maximum; + } else if (otherInt.hasMinimum) { + if (otherInt.minimum < minimum) { + minimum = otherInt.minimum; + } + if (otherInt.maximum > maximum) { + maximum = otherInt.maximum; + } } - } - super.merge(other); - overflow |= otherInt.overflow; - if (!overflow) { - boolean wasPositive = sum >= 0; - sum += otherInt.sum; - if ((otherInt.sum >= 0) == wasPositive) { - overflow = (sum >= 0) != wasPositive; + + overflow |= otherInt.overflow; + if (!overflow) { + boolean wasPositive = sum >= 0; + sum += otherInt.sum; + if ((otherInt.sum >= 0) == wasPositive) { + overflow = (sum >= 0) != wasPositive; + } } } } @@ -277,20 +282,22 @@ void updateDouble(double value) { @Override void merge(ColumnStatisticsImpl other) { super.merge(other); - DoubleStatisticsImpl dbl = (DoubleStatisticsImpl) other; - if (!hasMinimum) { - hasMinimum = dbl.hasMinimum; - minimum = dbl.minimum; - maximum = dbl.maximum; - } else if (dbl.hasMinimum) { - if (dbl.minimum < minimum) { + if (other instanceof DoubleStatisticsImpl) { + DoubleStatisticsImpl dbl = (DoubleStatisticsImpl) other; + if (!hasMinimum) { + hasMinimum = dbl.hasMinimum; minimum = dbl.minimum; - } - if (dbl.maximum > maximum) { maximum = dbl.maximum; + } else if (dbl.hasMinimum) { + if (dbl.minimum < minimum) { + minimum = dbl.minimum; + } + if (dbl.maximum > maximum) { + maximum = dbl.maximum; + } } + sum += dbl.sum; } - sum += dbl.sum; } @Override @@ -383,24 +390,26 @@ void updateString(Text value) { @Override void merge(ColumnStatisticsImpl other) { super.merge(other); - StringStatisticsImpl str = (StringStatisticsImpl) other; - if (minimum == null) { - if(str.minimum != null) { - maximum = new Text(str.getMaximum()); - minimum = new Text(str.getMinimum()); - } else { + if (other instanceof StringStatisticsImpl) { + StringStatisticsImpl str = (StringStatisticsImpl) other; + if (minimum == null) { + if (str.minimum != null) { + maximum = new Text(str.getMaximum()); + minimum = new Text(str.getMinimum()); + } else { /* both are empty */ - maximum = minimum = null; - } - } else if (str.minimum != null) { - if (minimum.compareTo(str.minimum) > 0) { - minimum = new Text(str.getMinimum()); - } - if (maximum.compareTo(str.maximum) < 0) { - maximum = new Text(str.getMaximum()); + maximum = minimum = null; + } + } else if (str.minimum != null) { + if (minimum.compareTo(str.minimum) > 0) { + minimum = new Text(str.getMinimum()); + } + if (maximum.compareTo(str.maximum) < 0) { + maximum = new Text(str.getMaximum()); + } } + sum += str.sum; } - sum += str.sum; } @Override @@ -477,8 +486,10 @@ void updateBinary(BytesWritable value) { @Override void merge(ColumnStatisticsImpl other) { super.merge(other); - BinaryStatisticsImpl bin = (BinaryStatisticsImpl) other; - sum += bin.sum; + if (other instanceof BinaryColumnStatistics) { + BinaryStatisticsImpl bin = (BinaryStatisticsImpl) other; + sum += bin.sum; + } } @Override @@ -557,22 +568,24 @@ void updateDecimal(HiveDecimal value) { @Override void merge(ColumnStatisticsImpl other) { super.merge(other); - DecimalStatisticsImpl dec = (DecimalStatisticsImpl) other; - if (minimum == null) { - minimum = dec.minimum; - maximum = dec.maximum; - sum = dec.sum; - } else if (dec.minimum != null) { - if (minimum.compareTo(dec.minimum) > 0) { + if (other instanceof DecimalStatisticsImpl) { + DecimalStatisticsImpl dec = (DecimalStatisticsImpl) other; + if (minimum == null) { minimum = dec.minimum; - } - if (maximum.compareTo(dec.maximum) < 0) { maximum = dec.maximum; - } - if (sum == null || dec.sum == null) { - sum = null; - } else { - sum = sum.add(dec.sum); + sum = dec.sum; + } else if (dec.minimum != null) { + if (minimum.compareTo(dec.minimum) > 0) { + minimum = dec.minimum; + } + if (maximum.compareTo(dec.maximum) < 0) { + maximum = dec.maximum; + } + if (sum == null || dec.sum == null) { + sum = null; + } else { + sum = sum.add(dec.sum); + } } } } @@ -582,7 +595,7 @@ void merge(ColumnStatisticsImpl other) { OrcProto.ColumnStatistics.Builder result = super.serialize(); OrcProto.DecimalStatistics.Builder dec = OrcProto.DecimalStatistics.newBuilder(); - if (getNumberOfValues() != 0) { + if (getNumberOfValues() != 0 && minimum != null) { dec.setMinimum(minimum.toString()); dec.setMaximum(maximum.toString()); } @@ -667,16 +680,18 @@ void updateDate(DateWritable value) { @Override void merge(ColumnStatisticsImpl other) { super.merge(other); - DateStatisticsImpl dateStats = (DateStatisticsImpl) other; - if (minimum == null) { - minimum = dateStats.minimum; - maximum = dateStats.maximum; - } else if (dateStats.minimum != null) { - if (minimum > dateStats.minimum) { + if (other instanceof DateStatisticsImpl) { + DateStatisticsImpl dateStats = (DateStatisticsImpl) other; + if (minimum == null) { minimum = dateStats.minimum; - } - if (maximum < dateStats.maximum) { maximum = dateStats.maximum; + } else if (dateStats.minimum != null) { + if (minimum > dateStats.minimum) { + minimum = dateStats.minimum; + } + if (maximum < dateStats.maximum) { + maximum = dateStats.maximum; + } } } } @@ -686,7 +701,7 @@ void merge(ColumnStatisticsImpl other) { OrcProto.ColumnStatistics.Builder result = super.serialize(); OrcProto.DateStatistics.Builder dateStats = OrcProto.DateStatistics.newBuilder(); - if (getNumberOfValues() != 0) { + if (getNumberOfValues() != 0 && minimum != null) { dateStats.setMinimum(minimum); dateStats.setMaximum(maximum); } @@ -770,16 +785,18 @@ void updateTimestamp(Timestamp value) { @Override void merge(ColumnStatisticsImpl other) { super.merge(other); - TimestampStatisticsImpl timestampStats = (TimestampStatisticsImpl) other; - if (minimum == null) { - minimum = timestampStats.minimum; - maximum = timestampStats.maximum; - } else if (timestampStats.minimum != null) { - if (minimum > timestampStats.minimum) { + if (other instanceof TimestampStatisticsImpl) { + TimestampStatisticsImpl timestampStats = (TimestampStatisticsImpl) other; + if (minimum == null) { minimum = timestampStats.minimum; - } - if (maximum < timestampStats.maximum) { maximum = timestampStats.maximum; + } else if (timestampStats.minimum != null) { + if (minimum > timestampStats.minimum) { + minimum = timestampStats.minimum; + } + if (maximum < timestampStats.maximum) { + maximum = timestampStats.maximum; + } } } } @@ -789,7 +806,7 @@ void merge(ColumnStatisticsImpl other) { OrcProto.ColumnStatistics.Builder result = super.serialize(); OrcProto.TimestampStatistics.Builder timestampStats = OrcProto.TimestampStatistics .newBuilder(); - if (getNumberOfValues() != 0) { + if (getNumberOfValues() != 0 && minimum != null) { timestampStats.setMinimum(minimum); timestampStats.setMaximum(maximum); } diff --git a/ql/src/test/queries/clientpositive/orc_merge9.q b/ql/src/test/queries/clientpositive/orc_merge9.q new file mode 100644 index 0000000000000000000000000000000000000000..5f9fcfae7b9c1e4461f3902dd05bd3c37e6211d5 --- /dev/null +++ b/ql/src/test/queries/clientpositive/orc_merge9.q @@ -0,0 +1,21 @@ +create table ts_merge ( +userid bigint, +string1 string, +subtype double, +decimal1 decimal(38,18), +ts timestamp +) stored as orc; + +load data local inpath '../../data/files/orc_split_elim.orc' overwrite into table ts_merge; +load data local inpath '../../data/files/orc_split_elim.orc' into table ts_merge; + +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/ts_merge/; + +set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; + +alter table ts_merge concatenate; + +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/ts_merge/; diff --git a/ql/src/test/results/clientpositive/orc_merge9.q.out b/ql/src/test/results/clientpositive/orc_merge9.q.out new file mode 100644 index 0000000000000000000000000000000000000000..31087421ab61fc94965310f891d4611211aa78af --- /dev/null +++ b/ql/src/test/results/clientpositive/orc_merge9.q.out @@ -0,0 +1,48 @@ +PREHOOK: query: create table ts_merge ( +userid bigint, +string1 string, +subtype double, +decimal1 decimal(38,18), +ts timestamp +) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ts_merge +POSTHOOK: query: create table ts_merge ( +userid bigint, +string1 string, +subtype double, +decimal1 decimal(38,18), +ts timestamp +) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ts_merge +PREHOOK: query: load data local inpath '../../data/files/orc_split_elim.orc' overwrite into table ts_merge +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@ts_merge +POSTHOOK: query: load data local inpath '../../data/files/orc_split_elim.orc' overwrite into table ts_merge +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@ts_merge +PREHOOK: query: load data local inpath '../../data/files/orc_split_elim.orc' into table ts_merge +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@ts_merge +POSTHOOK: query: load data local inpath '../../data/files/orc_split_elim.orc' into table ts_merge +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@ts_merge +Found 2 items +#### A masked pattern was here #### +PREHOOK: query: alter table ts_merge concatenate +PREHOOK: type: ALTER_TABLE_MERGE +PREHOOK: Input: default@ts_merge +PREHOOK: Output: default@ts_merge +POSTHOOK: query: alter table ts_merge concatenate +POSTHOOK: type: ALTER_TABLE_MERGE +POSTHOOK: Input: default@ts_merge +POSTHOOK: Output: default@ts_merge +Found 1 items +#### A masked pattern was here ####