From faf57c37a4bfa80eab589a26aca134af5909c460 Mon Sep 17 00:00:00 2001 From: DavydovAA Date: Tue, 11 Jul 2017 16:11:08 +0300 Subject: [PATCH] Implement streaming API to BinaryObject. See #IGNITE-5602 --- modules/core/pom.xml | 7 - .../apache/ignite/binary/BinaryFieldStreamer.java | 111 ++++ .../org/apache/ignite/binary/BinaryObject.java | 8 + .../internal/binary/BinaryEnumObjectImpl.java | 7 + .../internal/binary/BinaryFieldStreamerImpl.java | 191 ++++++ .../ignite/internal/binary/BinaryObjectImpl.java | 7 + .../internal/binary/BinaryObjectOffheapImpl.java | 6 + .../BinaryFieldStreamerAbstractSelfTest.java | 697 +++++++++++++++++++++ .../binary/BinaryFieldStreamerHeapSelfTest.java | 61 ++ .../binary/BinaryFieldStreamerOffeapSelfTest.java | 64 ++ 10 files changed, 1152 insertions(+), 7 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/binary/BinaryFieldStreamer.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldStreamerImpl.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerAbstractSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerHeapSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerOffeapSelfTest.java diff --git a/modules/core/pom.xml b/modules/core/pom.xml index 17e850d..7afe11f 100644 --- a/modules/core/pom.xml +++ b/modules/core/pom.xml @@ -159,13 +159,6 @@ - com.google.guava - guava - 14.0.1 - test - - - org.gridgain ignite-shmem 1.0.0 diff --git a/modules/core/src/main/java/org/apache/ignite/binary/BinaryFieldStreamer.java b/modules/core/src/main/java/org/apache/ignite/binary/BinaryFieldStreamer.java new file mode 100644 index 0000000..449fd86 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/binary/BinaryFieldStreamer.java @@ -0,0 +1,111 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.binary; + +import java.io.IOException; +import java.io.InputStream; + +/** + * + * @author a_a_davydov + */ +public interface BinaryFieldStreamer extends Comparable{ + + public static BinaryFieldStreamer NO_ANY_FIELD_ACCESSOR = new BinaryFieldStreamer() { + @Override + public boolean setToField(String fieldName) { + return false; + } + + @Override + public int available() throws IOException { + return 0; + } + + @Override + public int read() throws IOException { + return -1; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return -1; + } + + @Override + public int read(byte[] b) throws IOException { + return -1; + } + + @Override + public long skip(long n) throws IOException { + if (n > 0) { + return -1; + } else { + return 0; + } + } + + @Override + public InputStream asInputStream() { + return new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; + } + + @Override + public int compareTo(BinaryFieldStreamer o) { + if (o == null) { + throw new BinaryObjectException("Compare to null"); + } + + if (this == o) { + return 0; + } + + return -o.compareTo(this); + } + }; + + /** + * Setup streamer to read field specified by field name + * + * @param fieldName - name of field to access + * @return true only if field present and field has supported type + */ + public boolean setToField(String fieldName); + + public int available() throws IOException; + + public int read() throws IOException; + + public int read(byte[] b, int off, int len) throws IOException; + + public int read(byte[] b) throws IOException; + + public long skip(long n) throws IOException; + + /** + * Get representation of this streamer as input stream. + * + * @return representation of this steamer as input stream + */ + public InputStream asInputStream(); + +} diff --git a/modules/core/src/main/java/org/apache/ignite/binary/BinaryObject.java b/modules/core/src/main/java/org/apache/ignite/binary/BinaryObject.java index b9e653f..c1577b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/binary/BinaryObject.java +++ b/modules/core/src/main/java/org/apache/ignite/binary/BinaryObject.java @@ -170,4 +170,12 @@ public interface BinaryObject extends Serializable, Cloneable { * @throws BinaryObjectException If object is not enum. */ public String enumName() throws BinaryObjectException; + + /** + * Get reusable BinaryFieldStreamer. Useful for data streaming. + * + * @return new instance of BinaryFieldStreamer backed of this object. + */ + public BinaryFieldStreamer getFieldStreamer(); + } \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java index 12a0fc3..9d3793f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java @@ -42,6 +42,7 @@ import org.jetbrains.annotations.Nullable; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.ignite.internal.processors.cache.CacheObjectAdapter.objectPutSize; +import org.apache.ignite.binary.BinaryFieldStreamer; /** * Binary enum object. @@ -439,4 +440,10 @@ public class BinaryEnumObjectImpl implements BinaryObjectEx, Externalizable, Cac public boolean isTypeEquals(final Class cls) { return ctx.descriptorForClass(cls, false).typeId() == typeId(); } + + @Override + public BinaryFieldStreamer getFieldStreamer() { + return BinaryFieldStreamer.NO_ANY_FIELD_ACCESSOR; + } + } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldStreamerImpl.java new file mode 100644 index 0000000..b2b0d51 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldStreamerImpl.java @@ -0,0 +1,191 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.binary; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteOrder; +import org.apache.ignite.binary.BinaryFieldStreamer; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.streams.BinaryInputStream; + +/** + * + * @author a_a_davydov + */ +public final class BinaryFieldStreamerImpl extends InputStream implements BinaryFieldStreamer { + + private int limit = 0; + private final BinaryReaderExImpl reader; + + BinaryFieldStreamerImpl(BinaryReaderExImpl reader) { + this.reader = reader; + } + + /** {@inheritDoc} */ + @Override + public boolean setToField(String fieldName) { + + if (reader.findFieldByName(fieldName)) { + byte flag = reader.readByte(); + + switch (flag) { + /* + //Place to add new supported types. For example: + //case GridBinaryMarshaller.LONG: + // limit = Long.BYTES; + // return true; + */ + case GridBinaryMarshaller.STRING: + case GridBinaryMarshaller.BYTE_ARR: + limit = reader.readInt(); + return true; + } + } + + limit = 0; + return false; + + } + + /** {@inheritDoc} */ + @Override + public int available() throws IOException { + return Math.min(limit, reader.available()); + } + + /** {@inheritDoc} */ + @Override + public int read() throws IOException { + if (limit > 0) { + limit--; + return reader.read(); + } else { + return -1; + } + } + + /** {@inheritDoc} */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + int read = Math.min(limit, len); + + if (read > 0) { + read = reader.read(b, off, read); + + if (read > 0) { + limit = limit - read; + } + + return read; + } else { + return -1; + } + } + + /** {@inheritDoc} */ + @Override + public int read(byte[] b) throws IOException { + return this.read(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override + public long skip(long n) throws IOException { + long skip = Math.min(limit, n); + skip = reader.skip(skip); + limit = (int) (limit - skip); + return skip; + } + + /** {@inheritDoc} */ + @Override + public void close() throws IOException { + } + + /** {@inheritDoc} */ + @Override + public InputStream asInputStream() { + return this; + } + + private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN); + private static final int UNSIGNED_MASK = 0xFF; + + /** {@inheritDoc} */ + @Override + public int compareTo(BinaryFieldStreamer o) { + + if (o == null) { + throw new BinaryObjectException("Compare to null"); + } + + if (this == o) { + return 0; + } + + if(!(o instanceof BinaryFieldStreamerImpl)) { + return limit; // DUMMY implementation has always 0 limit; + } + + BinaryInputStream lIn = this.reader.in(); + BinaryInputStream rIn = ((BinaryFieldStreamerImpl)o).reader.in(); + + int lState = lIn.position(); + int rState = rIn.position(); + + int minLength = Math.min(this.limit, ((BinaryFieldStreamerImpl)o).limit); + int minWords = minLength / Long.BYTES; + + try { + + //comparator logic ported from Guava UnsignedBytes implemetation + for (int i = 0; i < minWords * Long.BYTES; i += Long.BYTES) { + long lw = lIn.readLong(); + long rw = rIn.readLong(); + if (lw != rw) { + + if (BIG_ENDIAN) { + //because binary input stream reverse bytes in this case + //possible it is more efficient way to handle this situation + //but I have no enviroment to test + lw = Long.reverseBytes(lw); + rw = Long.reverseBytes(rw); + return Long.compare(lw ^ Long.MIN_VALUE, rw ^ Long.MIN_VALUE); + } + + int n = Long.numberOfTrailingZeros(lw ^ rw) & ~0x7; + return (int) (((lw >>> n) & UNSIGNED_MASK) - ((rw >>> n) & UNSIGNED_MASK)); + } + } + + for (int i = minWords * Long.BYTES; i < minLength; i++) { + int result = (lIn.readByte() & UNSIGNED_MASK) - (rIn.readByte() & UNSIGNED_MASK); + if (result != 0) { + return result; + } + } + + return this.limit - ((BinaryFieldStreamerImpl)o).limit; + } finally { + //Set streams to original state + lIn.position(lState); + rIn.position(rState); + } + + } + +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java index 1e706ae..457ba4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java @@ -852,4 +852,11 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern return super.toString(); } + + + @Override + public BinaryFieldStreamerImpl getFieldStreamer() { + return new BinaryFieldStreamerImpl(reader(null, false)); + } + } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java index 05f6963..acae654 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java @@ -531,4 +531,10 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter rCtx, forUnmarshal); } + + @Override + public BinaryFieldStreamerImpl getFieldStreamer() { + return new BinaryFieldStreamerImpl(reader(null, false)); + } + } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerAbstractSelfTest.java new file mode 100644 index 0000000..1b4a049 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerAbstractSelfTest.java @@ -0,0 +1,697 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.binary; + +import com.google.common.primitives.UnsignedBytes; +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Comparator; +import java.util.concurrent.ThreadLocalRandom; +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; +import org.apache.ignite.binary.BinaryFieldStreamer; +import org.apache.ignite.binary.BinaryTypeConfiguration; +import org.apache.ignite.configuration.BinaryConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.logger.NullLogger; +import org.apache.ignite.marshaller.MarshallerContextTestImpl; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + * @author a_a_davydov + */ +public abstract class BinaryFieldStreamerAbstractSelfTest extends GridCommonAbstractTest { + + /** + * Marshaller. + */ + protected BinaryMarshaller dfltMarsh; + + /** + * Create marshaller. + * + * @return Binary marshaller. + * @throws Exception If failed. + */ + protected BinaryMarshaller createMarshaller() throws Exception { + BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), + new NullLogger()); + + BinaryMarshaller marsh = new BinaryMarshaller(); + + BinaryConfiguration bCfg = new BinaryConfiguration(); + + bCfg.setCompactFooter(compactFooter()); + + bCfg.setTypeConfigurations(Arrays.asList( + new BinaryTypeConfiguration(TestObject.class.getName()), + new BinaryTypeConfiguration(TestOuterObject.class.getName()) + )); + + IgniteConfiguration iCfg = new IgniteConfiguration(); + + iCfg.setBinaryConfiguration(bCfg); + + marsh.setContext(new MarshallerContextTestImpl(null)); + + IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", ctx, iCfg); + + return marsh; + } + + /** + * @return Whether to use compact footer. + */ + protected boolean compactFooter() { + return true; + } + + /** + * Get binary context for the current marshaller. + * + * @param marsh Marshaller. + * @return Binary context. + */ + protected static BinaryContext binaryContext(BinaryMarshaller marsh) { + GridBinaryMarshaller impl = U.field(marsh, "impl"); + + return impl.context(); + } + + /** + * {@inheritDoc} + */ + @Override + protected void beforeTest() throws Exception { + super.beforeTest(); + + dfltMarsh = createMarshaller(); + } + + /** + * Test short byte array field. + * + * @throws Exception If failed. + */ + public void testShortByteArr() throws Exception { + check("shortByteArr"); + } + + /** + * Test long byte array field. + * + * @throws Exception If failed. + */ + public void testLongByteArr() throws Exception { + check("longByteArr"); + } + + /** + * Test short string field. + * + * @throws Exception If failed. + */ + public void testShortString() throws Exception { + check("shortString"); + } + + /** + * Test long string field. + * + * @throws Exception If failed. + */ + public void testLongString() throws Exception { + check("longString"); + } + + /** + * Test streamer is applicable to read some fields. + * + * @throws Exception If failed. + */ + public void testReusage() throws Exception { + check("longString", "shortByteArr", "longByteArr", "shortString"); + } + + /** + * Test streamer process other types correctly. + * + * @throws Exception If failed. + */ + public void testNotSupportedTypes() throws Exception { + check("ignoredValue3", "longString", "shortByteArr", "ignoredValue2", "longByteArr", "shortString", "ignoredValue1"); + } + + /** + * Test streamer process other types correctly. + * + * @throws Exception If failed. + */ + public void testNotexistsFields() throws Exception { + check(new String[]{"nodata3", "longString", "shortByteArr", "nodata2", "longByteArr", "shortString", "nodata1"}, + new boolean[]{false, true, true, false, true, true, false}); + } + + private void checkCompare(byte[] arr1, byte[] arr2) throws Exception { + + int etalon = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(arr1, arr2)); + + CmpTestObject o1 = new CmpTestObject(arr1); + CmpTestObject o2 = new CmpTestObject(arr2); + + BinaryObjectExImpl bin1_1 = toBinary(dfltMarsh, o1); + BinaryObjectExImpl bin1_2 = toBinary(dfltMarsh, o2); + BinaryObjectExImpl bin2_1 = toBinaryCross(dfltMarsh, o1); + BinaryObjectExImpl bin2_2 = toBinaryCross(dfltMarsh, o2); + + BinaryFieldStreamer fieldStreamer1_1 = bin1_1.getFieldStreamer(); + BinaryFieldStreamer fieldStreamer1_2 = bin1_2.getFieldStreamer(); + BinaryFieldStreamer fieldStreamer2_1 = bin2_1.getFieldStreamer(); + BinaryFieldStreamer fieldStreamer2_2 = bin2_2.getFieldStreamer(); + + BinaryFieldStreamer fieldStreamer1_1_1 = bin1_1.getFieldStreamer(); + + assertTrue(fieldStreamer1_1.setToField("byteArr")); + assertTrue(fieldStreamer1_2.setToField("byteArr")); + assertTrue(fieldStreamer2_1.setToField("byteArr")); + assertTrue(fieldStreamer2_2.setToField("byteArr")); + + assertTrue(fieldStreamer1_1_1.setToField("byteArr")); + + assertEquals(etalon, Integer.signum(fieldStreamer1_1.compareTo(fieldStreamer1_2))); + assertEquals(etalon, Integer.signum(fieldStreamer1_1.compareTo(fieldStreamer2_2))); + + assertEquals(-etalon, Integer.signum(fieldStreamer1_2.compareTo(fieldStreamer1_1))); + assertEquals(-etalon, Integer.signum(fieldStreamer1_2.compareTo(fieldStreamer2_1))); + + assertEquals(0, Integer.signum(fieldStreamer1_1.compareTo(fieldStreamer1_1))); + assertEquals(0, Integer.signum(fieldStreamer1_1.compareTo(fieldStreamer2_1))); + + assertEquals(0, Integer.signum(fieldStreamer1_1.compareTo(fieldStreamer1_1_1))); + + } + + public void testCompare() throws Exception { + + ThreadLocalRandom r = ThreadLocalRandom.current(); + + { + byte[] arr1 = new byte[]{0, 0, 0, 0, 0, 0, 0, 0}; + byte[] arr2 = new byte[]{0, 0, 0, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{-128, 0, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{0, -128, 0, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, -128, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{0, 0, -128, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, -128, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{-128, 0, 0, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 0, 0, 0, 0, 0, 0, -128}; + arr2 = new byte[]{0, 0, 0, 0, 0, 0, -128, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 0, 0, 0, 0, 0, -128, 0}; + arr2 = new byte[]{0, 0, 0, 0, 0, -128, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{127, 0, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{0, 127, 0, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 127, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{0, 0, 127, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 127, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{127, 0, 0, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 0, 0, 0, 0, 0, 0, 127}; + arr2 = new byte[]{0, 0, 0, 0, 0, 0, 127, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 0, 0, 0, 0, 0, 127, 0}; + arr2 = new byte[]{0, 0, 0, 0, 0, 127, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{15, 0, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{0, 15, 0, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 15, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{0, 0, 15, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 15, 0, 0, 0, 0, 0, 0}; + arr2 = new byte[]{15, 0, 0, 0, 0, 0, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 0, 0, 0, 0, 0, 0, 15}; + arr2 = new byte[]{0, 0, 0, 0, 0, 0, 15, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, 0, 0, 0, 0, 0, 15, 0}; + arr2 = new byte[]{0, 0, 0, 0, 0, 15, 0, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{-128, -128, -128, -128, -128, -128, -128, -128}; + arr2 = new byte[]{0, -128, -128, -128, -128, -128, -128, -128}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{0, -128, -128, -128, -128, -128, -128, -128}; + arr2 = new byte[]{0, 0, -128, -128, -128, -128, -128, -128}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{-128, -128, -128, -128, -128, -128, -128, -128}; + arr2 = new byte[]{-128, -128, -128, -128, -128, -128, -128, 0}; + + checkCompare(arr1, arr2); + + arr1 = new byte[]{-128, -128, -128, -128, -128, -128, -128, 0}; + arr2 = new byte[]{-128, -128, -128, -128, -128, -128, 0, 0}; + + checkCompare(arr1, arr2); + + } + + for (int i = 0; i < 100; i++) { + byte[] arr1 = new byte[r.nextInt(32 * 1024)]; + byte[] arr2 = new byte[r.nextInt(32 * 1024)]; + + r.nextBytes(arr2); + r.nextBytes(arr1); + + checkCompare(arr1, arr2); + } + + //equals prefixes + for (int i = 0; i < 10; i++) { + byte[] arr1 = new byte[r.nextInt(32 * 1024)]; + byte[] arr2 = new byte[r.nextInt(32 * 1024)]; + + r.nextBytes(arr2); + r.nextBytes(arr1); + + if (arr1.length < arr2.length) { + System.arraycopy(arr1, 0, arr2, 0, arr1.length / 2); + } else { + System.arraycopy(arr2, 0, arr1, 0, arr2.length / 2); + } + + checkCompare(arr1, arr2); + } + + //equals prefixes + for (int i = 0; i < 10; i++) { + byte[] arr1 = new byte[r.nextInt(32 * 1024)]; + byte[] arr2 = new byte[r.nextInt(32 * 1024)]; + + r.nextBytes(arr2); + r.nextBytes(arr1); + + if (arr1.length < arr2.length) { + System.arraycopy(arr1, 0, arr2, 0, arr1.length - 1); + } else { + System.arraycopy(arr2, 0, arr1, 0, arr2.length - 1); + } + + checkCompare(arr1, arr2); + } + + //equals prefixes + for (int i = 0; i < 10; i++) { + byte[] arr1 = new byte[r.nextInt(32 * 1024)]; + byte[] arr2 = new byte[r.nextInt(32 * 1024)]; + + r.nextBytes(arr2); + r.nextBytes(arr1); + + if (arr1.length < arr2.length) { + System.arraycopy(arr1, 0, arr2, 0, arr1.length); + } else { + System.arraycopy(arr2, 0, arr1, 0, arr2.length); + } + + checkCompare(arr1, arr2); + } + + } + + public void check(String[] fieldNames, boolean[] exists) throws Exception { + checkNormal(dfltMarsh, fieldNames, exists); + checkNested(dfltMarsh, fieldNames, exists); + } + + /** + * Check field resolution in both normal and nested modes. + * + * @param fieldNames Field names. + * @throws Exception If failed. + */ + public void check(String... fieldNames) throws Exception { + boolean[] exists = new boolean[fieldNames.length]; + + for (int i = 0; i < exists.length; i++) { + exists[i] = true; + } + + checkNormal(dfltMarsh, fieldNames, exists); + checkNested(dfltMarsh, fieldNames, exists); + } + + /** + * Check field. + * + * @param marsh Marshaller. + * @param fieldNames Field names. + * @param exists Whether fields should exist. + * @throws Exception If failed. + */ + private void checkNormal(BinaryMarshaller marsh, String[] fieldNames, boolean[] exists) throws Exception { + TestContext testCtx = context(marsh, fieldNames); + check0(testCtx, exists); + } + + /** + * Check nested field. + * + * @param marsh Marshaller. + * @param fieldNames Field names. + * @param exists Whether fields should exist. + * @throws Exception If failed. + */ + private void checkNested(BinaryMarshaller marsh, String[] fieldNames, boolean[] exists) throws Exception { + TestContext testCtx = nestedContext(marsh, fieldNames); + check0(testCtx, exists); + } + + /** + * Internal check routine. + * + * @param ctx Context. + * @param exists Whether fields should exist. + * @throws Exception If failed. + */ + private void check0(TestContext ctx, boolean... exists) throws Exception { + + BinaryFieldStreamer streamer = ctx.portObj.getFieldStreamer(); + + for (int i = 0; i < exists.length; i++) { + String fieldName = ctx.fieldNames[i]; + if (exists[i]) { + Object expVal = U.field(ctx.obj, fieldName); + + if (expVal instanceof String) { + assertTrue(streamer.setToField(fieldName)); + String expString = (String) expVal; + char[] buffer = new char[expString.length()]; + byte expBytes[] = expString.getBytes(StandardCharsets.UTF_8); + byte[] buffer2 = new byte[expBytes.length]; + + assertEquals(expBytes.length, streamer.available()); + + Reader r = new InputStreamReader(new BufferedInputStream(streamer.asInputStream(), 16 * 1024), StandardCharsets.UTF_8); + int read = r.read(buffer); + + assertEquals(expString.length(), read); + assertEquals(0, streamer.available()); + assertEquals(expString, new String(buffer)); + + assertTrue(streamer.setToField(fieldName)); + assertEquals(expBytes.length, streamer.available()); + + read = streamer.read(buffer2); + + assertEquals(expBytes.length, read); + assertEquals(0, streamer.available()); + assertTrue(Arrays.equals(expBytes, buffer2)); + + } else if (expVal instanceof byte[]) { + assertTrue(streamer.setToField(fieldName)); + byte[] expArray = (byte[]) expVal; + byte[] buffer = new byte[expArray.length]; + byte[] buffer2 = new byte[expArray.length]; + + assertEquals(expArray.length, streamer.available()); + + InputStream is = new BufferedInputStream(streamer.asInputStream(), 16 * 1024); + + int read = is.read(buffer); + + assertEquals(expArray.length, read); + assertEquals(0, streamer.available()); + assertTrue(Arrays.equals(expArray, buffer)); + + assertTrue(streamer.setToField(fieldName)); + assertEquals(expArray.length, streamer.available()); + + read = streamer.read(buffer2); + + assertEquals(expArray.length, read); + assertEquals(0, streamer.available()); + assertTrue(Arrays.equals(expArray, buffer2)); + + } else { + assertFalse(streamer.setToField(fieldName)); + assertEquals(0, streamer.available()); + } + } else { + assertFalse(streamer.setToField(fieldName)); + assertEquals(0, streamer.available()); + } + } + + } + + /** + * Get test context. + * + * @param marsh Binary marshaller. + * @param fieldName Field name. + * @return Test context. + * @throws Exception If failed. + */ + private TestContext context(BinaryMarshaller marsh, String... fieldNames) throws Exception { + TestObject obj = createObject(); + + BinaryObjectExImpl portObj = toBinary(marsh, obj); + + return new TestContext(obj, portObj, fieldNames); + } + + /** + * Get test context with nested test object. + * + * @param marsh Binary marshaller. + * @param fieldName Field name. + * @return Test context. + * @throws Exception If failed. + */ + private TestContext nestedContext(BinaryMarshaller marsh, String... fieldNames) + throws Exception { + TestObject obj = createObject(); + TestOuterObject outObj = new TestOuterObject(obj); + + BinaryObjectExImpl portOutObj = toBinary(marsh, outObj); + BinaryObjectExImpl portObj = portOutObj.field("fInner"); + + assert portObj != null; + + return new TestContext(obj, portObj, fieldNames); + } + + /** + * Create test object. + * + * @return Test object. + */ + private TestObject createObject() { + return new TestObject(0); + } + + /** + * Convert object to binary object. + * + * @param marsh Marshaller. + * @param obj Object. + * @return Binary object. + * @throws Exception If failed. + */ + protected abstract BinaryObjectExImpl toBinary(BinaryMarshaller marsh, Object obj) throws Exception; + + /** + * Convert object to binary object (Use different memory allocation then + * toBinary). + * + * @param marsh Marshaller. + * @param obj Object. + * @return Binary object. + * @throws Exception If failed. + */ + protected abstract BinaryObjectExImpl toBinaryCross(BinaryMarshaller marsh, Object obj) throws Exception; + + /** + * Outer test object. + */ + @SuppressWarnings("UnusedDeclaration") + public static class TestOuterObject { + + /** + * Inner object. + */ + public TestObject fInner; + + /** + * Default constructor. + */ + public TestOuterObject() { + // No-op. + } + + /** + * Constructor. + * + * @param fInner Inner object. + */ + public TestOuterObject(TestObject fInner) { + this.fInner = fInner; + } + } + + @SuppressWarnings("UnusedDeclaration") + public static class CmpTestObject { + + long ignoredValue1; + public byte[] byteArr; + long ignoredValue2; + + public CmpTestObject() { + } + + public CmpTestObject(byte[] byteArr) { + long ignoredValue1; + this.byteArr = byteArr; + long ignoredValue2; + } + + } + + /** + * Test object class, c + */ + @SuppressWarnings("UnusedDeclaration") + public static class TestObject { + + long ignoredValue1; + public byte[] shortByteArr; + long ignoredValue2; + public byte[] longByteArr; + long ignoredValue3; + public String shortString; + long ignoredValue4; + public String longString; + long ignoredValue5; + + /** + * Default constructor. + */ + public TestObject() { + // No-op. + } + + /** + * Non-default constructor. + * + * @param ignore Ignored. + */ + public TestObject(int ignore) { + ignoredValue1 = 42; + shortByteArr = new byte[32]; + ThreadLocalRandom.current().nextBytes(shortByteArr); + ignoredValue2 = 33; + longByteArr = new byte[16 * 1024]; + ThreadLocalRandom.current().nextBytes(longByteArr); + ignoredValue3 = -33; + shortString = new BigInteger(130, ThreadLocalRandom.current()).toString(32) + "testLongCharsПроверим+всякие=знаки"; + ignoredValue4 = -42; + longString = "testLongCharsПроверим+всякие=знаки" + new BigInteger(16 * 1024 * 5, ThreadLocalRandom.current()).toString(32); + ignoredValue5 = 120585; + } + } + + /** + * Test context. + */ + public static class TestContext { + + /** + * Object. + */ + public final TestObject obj; + + /** + * Binary object. + */ + public final BinaryObjectExImpl portObj; + public final String[] fieldNames; + + /** + * Constructor. + * + * @param obj Object. + * @param portObj Binary object. + */ + public TestContext(TestObject obj, BinaryObjectExImpl portObj, String... fieldNames) { + this.obj = obj; + this.portObj = portObj; + this.fieldNames = fieldNames; + } + } + +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerHeapSelfTest.java new file mode 100644 index 0000000..c2db754 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerHeapSelfTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.binary; + +import static org.apache.ignite.internal.binary.BinaryFieldStreamerAbstractSelfTest.binaryContext; +import org.apache.ignite.internal.util.GridUnsafe; +import org.eclipse.jetty.util.ConcurrentHashSet; + +/** + * + * @author a_a_davydov + */ +public class BinaryFieldStreamerHeapSelfTest extends BinaryFieldStreamerAbstractSelfTest{ + + /** Allocated unsafe pointer. */ + private final ConcurrentHashSet ptrs = new ConcurrentHashSet<>(); + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + // Cleanup allocated objects. + for (Long ptr : ptrs) + GridUnsafe.freeMemory(ptr); + + ptrs.clear(); + } + + /** {@inheritDoc} */ + @Override protected BinaryObjectExImpl toBinaryCross(BinaryMarshaller marsh, Object obj) throws Exception { + byte[] arr = marsh.marshal(obj); + + long ptr = GridUnsafe.allocateMemory(arr.length); + + ptrs.add(ptr); + + GridUnsafe.copyHeapOffheap(arr, GridUnsafe.BYTE_ARR_OFF, ptr, arr.length); + + return new BinaryObjectOffheapImpl(binaryContext(marsh), ptr, 0, arr.length); + } + + /** {@inheritDoc} */ + @Override protected BinaryObjectExImpl toBinary(BinaryMarshaller marsh, Object obj) throws Exception { + byte[] bytes = marsh.marshal(obj); + + return new BinaryObjectImpl(binaryContext(marsh), bytes, 0); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerOffeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerOffeapSelfTest.java new file mode 100644 index 0000000..1128f6e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryFieldStreamerOffeapSelfTest.java @@ -0,0 +1,64 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.binary; + +import static org.apache.ignite.internal.binary.BinaryFieldStreamerAbstractSelfTest.binaryContext; +import static org.apache.ignite.internal.binary.BinaryFieldsAbstractSelfTest.binaryContext; +import org.apache.ignite.internal.util.GridUnsafe; +import org.eclipse.jetty.util.ConcurrentHashSet; + +/** + * + * @author a_a_davydov + */ +public class BinaryFieldStreamerOffeapSelfTest extends BinaryFieldStreamerAbstractSelfTest{ + + /** Allocated unsafe pointer. */ + private final ConcurrentHashSet ptrs = new ConcurrentHashSet<>(); + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + // Cleanup allocated objects. + for (Long ptr : ptrs) + GridUnsafe.freeMemory(ptr); + + ptrs.clear(); + } + + /** {@inheritDoc} */ + @Override protected BinaryObjectExImpl toBinary(BinaryMarshaller marsh, Object obj) throws Exception { + byte[] arr = marsh.marshal(obj); + + long ptr = GridUnsafe.allocateMemory(arr.length); + + ptrs.add(ptr); + + GridUnsafe.copyHeapOffheap(arr, GridUnsafe.BYTE_ARR_OFF, ptr, arr.length); + + return new BinaryObjectOffheapImpl(binaryContext(marsh), ptr, 0, arr.length); + } + + /** {@inheritDoc} */ + @Override protected BinaryObjectExImpl toBinaryCross(BinaryMarshaller marsh, Object obj) throws Exception { + byte[] bytes = marsh.marshal(obj); + + return new BinaryObjectImpl(binaryContext(marsh), bytes, 0); + } + + +} -- 2.10.0.windows.1