From f872a0b1ddbbb79ab2f080a3470358ad306749e5 Mon Sep 17 00:00:00 2001 From: zhengdong Date: Mon, 21 Aug 2017 15:01:41 +0800 Subject: [PATCH] KYLIN-2799 Building cube with percentile measure encounter with NullPointerException --- .../measure/percentile/PercentileCounter.java | 28 ++++----- .../measure/percentile/PercentileCounterTest.java | 46 --------------- .../kylin/engine/spark/KylinKryoRegistrator.java | 10 ++-- .../spark/util/PercentileCounterSerializer.java | 55 ++++++++++++++++++ .../spark/util/PercentileSerializerTest.java | 67 ++++++++++++++++++++++ 5 files changed, 136 insertions(+), 70 deletions(-) mode change 100644 => 100755 core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java mode change 100644 => 100755 core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java mode change 100644 => 100755 engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java create mode 100755 engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java create mode 100755 engine-spark/src/test/java/org/apache/kylin/engine/spark/util/PercentileSerializerTest.java diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java old mode 100644 new mode 100755 index f86a796b9..89fa956ca --- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java @@ -18,9 +18,6 @@ package org.apache.kylin.measure.percentile; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; @@ -98,20 +95,15 @@ public class PercentileCounter implements Serializable { reInitRegisters(); } - private void writeObject(ObjectOutputStream out) throws IOException { - registers.compress(); - int bound = registers.byteSize(); - ByteBuffer buf = ByteBuffer.allocate(bound); - registers.asSmallBytes(buf); - out.defaultWriteObject(); - out.writeInt(bound); - out.write(buf.array(), 0, bound); - } - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - int bound = in.readInt(); - ByteBuffer buf = ByteBuffer.allocate(bound); - in.read(buf.array(), 0, bound); - registers = AVLTreeDigest.fromBytes(buf); + public double getCompression() { + return compression; + } + + public double getQuantileRatio() { + return quantileRatio; + } + + public TDigest getRegisters() { + return registers; } } diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java old mode 100644 new mode 100755 index 94a1233d7..de2692ee4 --- a/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java @@ -20,19 +20,11 @@ package org.apache.kylin.measure.percentile; import static org.junit.Assert.assertEquals; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.Collections; import java.util.List; import java.util.Random; -import org.apache.commons.io.IOUtils; import org.apache.kylin.common.util.MathUtil; -import org.junit.Assert; import org.junit.Test; import com.google.common.collect.Lists; @@ -85,42 +77,4 @@ public class PercentileCounterTest { assertEquals(expectedResult, actualResult, 0); } - @Test - public void testSerialization() { - double compression = 100; - double quantile = 0.5; - ByteArrayOutputStream os = new ByteArrayOutputStream(1024); - ObjectOutputStream out = null; - PercentileCounter origin_counter = null; - try { - out = new ObjectOutputStream(os); - - origin_counter = new PercentileCounter(compression, quantile); - out.writeObject(origin_counter); - - } catch (IOException e) { - e.printStackTrace(); - } finally { - IOUtils.closeQuietly(out); - } - - InputStream is = new ByteArrayInputStream(os.toByteArray()); - PercentileCounter serialized_counter = null; - ObjectInputStream in = null; - try { - in = new ObjectInputStream(is); - serialized_counter = (PercentileCounter)in.readObject(); - - Assert.assertNotNull(serialized_counter); - Assert.assertNotNull(serialized_counter.registers); - } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } finally { - IOUtils.closeQuietly(os); - IOUtils.closeQuietly(is); - } - - } } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java old mode 100644 new mode 100755 index ac5607594..9d1d09b6e --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java @@ -21,6 +21,8 @@ package org.apache.kylin.engine.spark; import java.util.LinkedHashSet; import java.util.Set; +import org.apache.kylin.engine.spark.util.PercentileCounterSerializer; +import org.apache.kylin.measure.percentile.PercentileCounter; import org.apache.spark.serializer.KryoRegistrator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,8 +93,7 @@ public class KylinKryoRegistrator implements KryoRegistrator { kyroClasses.add(org.roaringbitmap.buffer.MutableRoaringBitmap.class); kyroClasses.add(org.roaringbitmap.buffer.MappeableArrayContainer.class); kyroClasses.add(org.roaringbitmap.buffer.MappeableBitmapContainer.class); - kyroClasses.add(com.tdunning.math.stats.AVLTreeDigest.class); - kyroClasses.add(com.tdunning.math.stats.Centroid.class); + addClassQuitely(kyroClasses, "com.google.common.collect.EmptyImmutableList"); addClassQuitely(kyroClasses, "java.nio.HeapShortBuffer"); @@ -100,14 +101,12 @@ public class KylinKryoRegistrator implements KryoRegistrator { addClassQuitely(kyroClasses, "scala.collection.immutable.Map$EmptyMap$"); addClassQuitely(kyroClasses, "org.apache.spark.sql.catalyst.expressions.GenericInternalRow"); addClassQuitely(kyroClasses, "org.apache.spark.unsafe.types.UTF8String"); - addClassQuitely(kyroClasses, "com.tdunning.math.stats.AVLGroupTree"); for (Class kyroClass : kyroClasses) { kryo.register(kyroClass); } - // TODO: should use JavaSerializer for PercentileCounter after Kryo bug be fixed: https://github.com/EsotericSoftware/kryo/issues/489 - // kryo.register(PercentileCounter.class, new JavaSerializer()); + kryo.register(PercentileCounter.class, new PercentileCounterSerializer()); } /** @@ -237,7 +236,6 @@ public class KylinKryoRegistrator implements KryoRegistrator { kyroClasses.add(org.apache.kylin.measure.hllc.SingleValueRegister.class); kyroClasses.add(org.apache.kylin.measure.hllc.SparseRegister.class); kyroClasses.add(org.apache.kylin.measure.percentile.PercentileAggregator.class); - kyroClasses.add(org.apache.kylin.measure.percentile.PercentileCounter.class); kyroClasses.add(org.apache.kylin.measure.percentile.PercentileMeasureType.class); kyroClasses.add(org.apache.kylin.measure.percentile.PercentileSerializer.class); kyroClasses.add(org.apache.kylin.measure.raw.RawAggregator.class); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java new file mode 100755 index 000000000..a43a7336e --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.kylin.engine.spark.util; + +import java.nio.ByteBuffer; + +import org.apache.kylin.measure.percentile.PercentileCounter; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class PercentileCounterSerializer extends Serializer { + + @Override + public void write(Kryo kryo, Output output, PercentileCounter counter) { + int length = counter.getRegisters().byteSize(); + ByteBuffer buffer = ByteBuffer.allocate(length); + counter.getRegisters().asSmallBytes(buffer); + output.writeDouble(counter.getCompression()); + output.writeDouble(counter.getQuantileRatio()); + output.writeInt(buffer.position()); + output.write(buffer.array(), 0, buffer.position()); + } + + @Override + public PercentileCounter read(Kryo kryo, Input input, Class type) { + double compression = input.readDouble(); + double quantileRatio = input.readDouble(); + int length = input.readInt(); + byte[] buffer = new byte[length]; + input.read(buffer); + PercentileCounter counter = new PercentileCounter(compression, quantileRatio); + counter.readRegisters(ByteBuffer.wrap(buffer)); + return counter; + } +} diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/PercentileSerializerTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/PercentileSerializerTest.java new file mode 100755 index 000000000..eb5002ca3 --- /dev/null +++ b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/PercentileSerializerTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.kylin.engine.spark.util; + +import org.apache.kylin.measure.percentile.PercentileCounter; +import org.junit.Assert; +import org.junit.Test; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class PercentileSerializerTest { + + @Test + public void testSerialization() { + Kryo kryo = new Kryo(); + kryo.register(PercentileCounter.class, new PercentileCounterSerializer()); + double compression = 100; + double quantile = 0.8; + PercentileCounter origin_counter = new PercentileCounter(compression, quantile); + for (int i = 1; i < 10; i++) { + origin_counter.add(i); + } + byte[] buffer = serialize(kryo, origin_counter); + PercentileCounter deserialized_counter = deserialize(kryo, buffer, PercentileCounter.class); + Assert.assertEquals("Compression Error", origin_counter.getCompression(), deserialized_counter.getCompression(), + 0.00000001); + Assert.assertEquals("QuantileRatio Error", origin_counter.getQuantileRatio(), + deserialized_counter.getQuantileRatio(), 0.00000001); + Assert.assertEquals("Estimation Error", origin_counter.getResultEstimate(), + deserialized_counter.getResultEstimate(), 0.00000001); + } + + public static T deserialize(final Kryo kryo, final byte[] in, final Class clazz) { + final Input input = new Input(in); + return kryo.readObject(input, clazz); + } + + public static byte[] serialize(final Kryo kryo, final Object o) { + if (o == null) { + throw new NullPointerException("Can't serialize null"); + } + final Output output = new Output(4096); + kryo.writeObject(output, o); + output.flush(); + return output.getBuffer(); + } + +} -- 2.14.1