diff --git common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java index a9f5ed6..7133514 100644 --- common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java +++ common/src/java/org/apache/hadoop/hive/common/io/NonSyncByteArrayOutputStream.java @@ -67,18 +67,20 @@ public void write(int b) { count += 1; } - private int enLargeBuffer(int increment) { - int temp = count + increment; - int newLen = temp; - if (temp > buf.length) { - if ((buf.length << 1) > temp) { + protected final void enLargeBuffer(int increment) { + ensureTo(count + increment); + } + + protected final void ensureTo(int position) { + if (position > buf.length) { + int newLen = position; + if ((buf.length << 1) > position) { newLen = buf.length << 1; } - byte newbuf[] = new byte[newLen]; + byte[] newbuf = new byte[newLen]; System.arraycopy(buf, 0, newbuf, 0, count); buf = newbuf; } - return newLen; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index e33c1d4..c1406aa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -24,10 +24,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -39,6 +37,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.IndexedSerializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; @@ -109,7 +108,7 @@ */ protected transient ExprNodeEvaluator[] bucketEval = null; // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is ready - protected transient Serializer keySerializer; + protected transient IndexedSerializer keySerializer; protected transient boolean keyIsText; protected transient Serializer valueSerializer; protected transient byte[] tagByte = new byte[1]; @@ -131,14 +130,14 @@ * This two dimensional array holds key data and a corresponding Union object * which contains the tag identifying the aggregate expression for distinct columns. * - * If there is no distict expression, cachedKeys is simply like this. + * If there is no distinct expression, cachedKeys is simply like this. * cachedKeys[0] = [col0][col1] * - * with two distict expression, union(tag:key) is attatched for each distinct expression + * with two distinct expression, union(tag:key) is attached for each distinct expression * cachedKeys[0] = [col0][col1][0:dist1] * cachedKeys[1] = [col0][col1][1:dist2] * - * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1 + * in this case, child GBY evaluates distinct values with expression like KEY.col2:0.dist1 * see {@link ExprNodeColumnEvaluator} */ // TODO: we only ever use one row of these at a time. Why do we need to cache multiple? @@ -151,6 +150,9 @@ private IntObjectInspector bucketInspector; // OI for the bucket field in the record id protected transient long numRows = 0; + protected transient long prevNumRows = 0; + protected transient long prevLogTime = 0; + protected transient long cntr = 1; protected transient long logEveryNRows = 0; private final transient LongWritable recordCounter = new LongWritable(); @@ -222,7 +224,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { } TableDesc keyTableDesc = conf.getKeySerializeInfo(); - keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + keySerializer = (IndexedSerializer) keyTableDesc.getDeserializerClass() .newInstance(); keySerializer.initialize(null, keyTableDesc.getProperties()); keyIsText = keySerializer.getSerializedClass().equals(Text.class); @@ -307,7 +309,7 @@ public void process(Object row, int tag) throws HiveException { if (conf.getWriteType() == AcidUtils.Operation.UPDATE || conf.getWriteType() == AcidUtils.Operation.DELETE) { assert rowInspector instanceof StructObjectInspector : - "Exptected rowInspector to be instance of StructObjectInspector but it is a " + + "Expected rowInspector to be instance of StructObjectInspector but it is a " + rowInspector.getClass().getName(); acidRowInspector = (StructObjectInspector)rowInspector; // The record identifier is always in the first column @@ -334,7 +336,12 @@ public void process(Object row, int tag) throws HiveException { int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : numDistributionKeys; cachedKeys = new Object[numKeys][keyLen]; + for (int i = 0; i < numDistinctExprs; i++) { + cachedKeys[i][numDistributionKeys] = new StandardUnion((byte) i, + new Object[distinctColIndices.get(i).size()]); + } cachedValues = new Object[valueEval.length]; + prevLogTime = System.currentTimeMillis(); } // Determine distKeyLength (w/o distincts), and then add the first if present. @@ -351,13 +358,12 @@ public void process(Object row, int tag) throws HiveException { bucketNumber = computeBucketNumber(row, conf.getNumBuckets()); } - HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null); - int distKeyLength = firstKey.getDistKeyLength(); if (numDistinctExprs > 0) { populateCachedDistinctKeys(row, 0); - firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength); } + HiveKey firstKey = toHiveKey(cachedKeys[0], tag); + final int hashCode; // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0] @@ -392,7 +398,7 @@ public void process(Object row, int tag) throws HiveException { for (int i = 1; i < numDistinctExprs; i++) { System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys); populateCachedDistinctKeys(row, i); - HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength); + HiveKey hiveKey = toHiveKey(cachedKeys[i], tag); hiveKey.setHashCode(hashCode); collect(hiveKey, value); } @@ -428,9 +434,6 @@ private void populateCachedDistributionKeys(Object row, int index) throws HiveEx for (int i = 0; i < numDistributionKeys; i++) { cachedKeys[index][i] = keyEval[i].evaluate(row); } - if (cachedKeys[0].length > numDistributionKeys) { - cachedKeys[index][numDistributionKeys] = null; - } } /** @@ -439,15 +442,12 @@ private void populateCachedDistributionKeys(Object row, int index) throws HiveEx * @param index the cachedKeys index to write to */ private void populateCachedDistinctKeys(Object row, int index) throws HiveException { - StandardUnion union; - cachedKeys[index][numDistributionKeys] = union = new StandardUnion( - (byte)index, new Object[distinctColIndices.get(index).size()]); + StandardUnion union = (StandardUnion) cachedKeys[index][numDistributionKeys]; Object[] distinctParameters = (Object[]) union.getObject(); for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) { distinctParameters[distinctParamI] = keyEval[distinctColIndices.get(index).get(distinctParamI)].evaluate(row); } - union.setTag((byte) index); } protected final int computeMurmurHash(HiveKey firstKey) { @@ -499,17 +499,21 @@ private boolean partitionKeysAreNull(Object row) throws HiveException { } // Serialize the keys and append the tag - protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException { - BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector); + protected HiveKey toHiveKey(Object obj, int tag) throws SerDeException { + BinaryComparable key = keySerializer.serialize(obj, keyObjectInspector); int keyLength = key.getLength(); if (tag == -1 || skipTag) { keyWritable.set(key.getBytes(), 0, keyLength); } else { keyWritable.setSize(keyLength + 1); - System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; + System.arraycopy(key.getBytes(), 0, keyWritable.getBytes(), 0, keyLength); + keyWritable.getBytes()[keyLength] = tagByte[0]; } - keyWritable.setDistKeyLength((distLength == null) ? keyLength : distLength); + int distKeyLength = keyLength; + if (numDistributionKeys > 0) { + distKeyLength = keySerializer.getIndex()[numDistributionKeys - 1]; + } + keyWritable.setDistKeyLength(distKeyLength); return keyWritable; } @@ -520,20 +524,25 @@ public void collect(byte[] key, byte[] value, int hash) throws IOException { collect(keyWritable, valueWritable); } + @SuppressWarnings("unchecked") protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called if (null != out) { numRows++; - if (isLogInfoEnabled) { - if (numRows == cntr) { - cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows; - if (cntr < 0 || numRows < 0) { - cntr = 0; - numRows = 1; - } - LOG.info(toString() + ": records written - " + numRows); + if (isLogInfoEnabled && numRows == cntr) { + cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows; + long current = System.currentTimeMillis(); + if (cntr < 0 || numRows < 0) { + LOG.warn("Row counter exceeded limit of Long.MAX. Further log will be disabled"); + cntr = 0; + numRows = 1; + } else { + LOG.info(toString() + ": total records written - " + numRows + ", " + + (numRows - prevNumRows) + " rows for " + (current - prevLogTime) + " msec"); } + prevNumRows = numRows; + prevLogTime = current; } out.collect(keyWritable, valueWritable); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java index 7916a6f..e3bb45b 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java +++ serde/src/java/org/apache/hadoop/hive/serde2/ByteStream.java @@ -23,9 +23,6 @@ import org.apache.hadoop.hive.common.io.NonSyncByteArrayInputStream; import org.apache.hadoop.hive.common.io.NonSyncByteArrayOutputStream; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.ByteStream.Output; /** * Extensions to bytearrayinput/output streams. @@ -67,12 +64,8 @@ public Input(byte[] buf, int offset, int length) { * Output. * */ - public static final class Output + public static class Output extends NonSyncByteArrayOutputStream implements RandomAccessOutput { - @Override - public byte[] getData() { - return buf; - } public Output() { super(); @@ -84,23 +77,26 @@ public Output(int size) { @Override public void writeInt(long offset, int value) { + ensureTo((int)offset + 4); int offset2 = (int)offset; - getData()[offset2++] = (byte) (value >> 24); - getData()[offset2++] = (byte) (value >> 16); - getData()[offset2++] = (byte) (value >> 8); - getData()[offset2] = (byte) (value); + buf[offset2++] = (byte) (value >> 24); + buf[offset2++] = (byte) (value >> 16); + buf[offset2++] = (byte) (value >> 8); + buf[offset2] = (byte) (value); } @Override public void writeByte(long offset, byte value) { - getData()[(int) offset] = value; + ensureTo((int)offset + 1); + buf[(int) offset] = value; } @Override public void reserve(int byteCount) { - for (int i = 0; i < byteCount; ++i) { - write(0); - } + int newPos = count + byteCount; + ensureTo(newPos); + Arrays.fill(buf, count, newPos, (byte)0); + count = newPos; } public boolean arraysEquals(Output output) { @@ -116,6 +112,26 @@ public boolean arraysEquals(Output output) { } } + public static class IndexedOutput extends Output { + private final int[] index; + private int counter; + public IndexedOutput(int numFields) { + index = new int[numFields]; + } + public void mark() { + index[counter++] = count; + } + @Override + public void reset() { + super.reset(); + counter = 0; + Arrays.fill(index, -1); + } + public int[] getIndex() { + return index; + } + } + public static interface RandomAccessOutput { public void writeByte(long offset, byte value); public void writeInt(long offset, int value); diff --git serde/src/java/org/apache/hadoop/hive/serde2/IndexedSerializer.java serde/src/java/org/apache/hadoop/hive/serde2/IndexedSerializer.java new file mode 100644 index 0000000..2ff4e4f --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/IndexedSerializer.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BytesWritable; + +public interface IndexedSerializer extends Serializer { + + // indices on underlying BytesWritable for each column ends + int[] getIndex(); + + @Override + BytesWritable serialize(Object obj, ObjectInspector objInspector) throws SerDeException; +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java index 313b5f6..8529ff6 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; +import org.apache.hadoop.hive.serde2.IndexedSerializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -119,7 +120,7 @@ @SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES, serdeConstants.SERIALIZATION_SORT_ORDER}) -public class BinarySortableSerDe extends AbstractSerDe { +public class BinarySortableSerDe extends AbstractSerDe implements IndexedSerializer { public static final Log LOG = LogFactory.getLog(BinarySortableSerDe.class.getName()); @@ -171,6 +172,8 @@ public void initialize(Configuration conf, Properties tbl) columnSortOrderIsDesc[i] = (columnSortOrder != null && columnSortOrder .charAt(i) == '-'); } + + output = new ByteStream.IndexedOutput(columnNames.size()); } @Override @@ -619,10 +622,10 @@ public static Text deserializeText(InputByteBuffer buffer, boolean invert, Text } BytesWritable serializeBytesWritable = new BytesWritable(); - ByteStream.Output output = new ByteStream.Output(); + ByteStream.IndexedOutput output; @Override - public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + public BytesWritable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { output.reset(); StructObjectInspector soi = (StructObjectInspector) objInspector; List fields = soi.getAllStructFieldRefs(); @@ -630,6 +633,7 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDe for (int i = 0; i < columnNames.size(); i++) { serialize(output, soi.getStructFieldData(obj, fields.get(i)), fields.get(i).getFieldObjectInspector(), columnSortOrderIsDesc[i]); + output.mark(); } serializeBytesWritable.set(output.getData(), 0, output.getLength()); @@ -973,6 +977,11 @@ public SerDeStats getSerDeStats() { return null; } + @Override + public int[] getIndex() { + return output.getIndex(); + } + public static void serializeStruct(Output byteStream, Object[] fieldData, List fieldOis, boolean[] sortableSortOrders) throws SerDeException { for (int i = 0; i < fieldData.length; i++) {