Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java (revision ) @@ -20,9 +20,12 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -30,9 +33,6 @@ import org.h2.value.Value; import org.h2.value.ValueArray; -import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.fillArray; -import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessage; - /** * H2 Array. */ @@ -42,7 +42,7 @@ private Collection x; /** - * + * Constructor. */ public GridH2Array() { // No-op. @@ -52,7 +52,7 @@ * @param val Value. * @throws IgniteCheckedException If failed. */ - public GridH2Array(Value val) throws IgniteCheckedException { + public GridH2Array(GridKernalContext ctx, Value val) throws IgniteCheckedException { assert val.getType() == Value.ARRAY : val.getType(); ValueArray arr = (ValueArray)val; @@ -60,12 +60,22 @@ x = new ArrayList<>(arr.getList().length); for (Value v : arr.getList()) - x.add(toMessage(v)); + x.add(H2Utils.toMessage(ctx, v)); } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) throws IgniteCheckedException { - return ValueArray.get(fillArray(x.iterator(), new Value[x.size()], ctx)); + @Override public Value value() throws IgniteCheckedException { + Iterator src = x.iterator(); + + Value[] dst = new Value[x.size()]; + + for (int i = 0; i < dst.length; i++) { + Message msg = src.next(); + + dst[i] = ((GridH2ValueMessage)msg).value(); + } + + return ValueArray.get(dst); } /** {@inheritDoc} */ Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -50,7 +50,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueJavaObject.getNoCopy(null, b, null); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -39,7 +39,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueNull.INSTANCE; } Index: modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java (revision ) @@ -17,11 +17,6 @@ package org.apache.ignite.internal.direct; -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; @@ -34,8 +29,15 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; import org.jetbrains.annotations.Nullable; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + /** * Message reader implementation. */ @@ -328,9 +330,15 @@ /** {@inheritDoc} */ @Override public > C readCollection(String name, MessageCollectionItemType itemType) { + return readCollection(name, itemType, null); + } + + /** {@inheritDoc} */ + @Override public > C readCollection(String name, MessageCollectionItemType itemType, + @Nullable MessageConverter converter) { DirectByteBufferStream stream = state.item().stream; - C col = stream.readCollection(itemType, this); + C col = stream.readCollection(itemType, this, converter); lastRead = stream.lastFinished(); Index: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java (revision ) @@ -317,6 +317,9 @@ for (IgniteComponentType compType : IgniteComponentType.values()) { MessageFactory f = compType.messageFactory(); + if (f instanceof MessageFactoryEx) + ((MessageFactoryEx)f).kernalContext(ctx); + if (f != null) compMsgs.add(f); } @@ -324,7 +327,7 @@ if (!compMsgs.isEmpty()) msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()])); - msgFactory = new GridIoMessageFactory(msgs); + msgFactory = new GridIoMessageFactory(ctx, msgs); if (log.isDebugEnabled()) log.debug(startInfo()); Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -50,7 +50,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueDate.fromDateValue(date); } Index: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/MessageFactoryEx.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/managers/communication/MessageFactoryEx.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/managers/communication/MessageFactoryEx.java (revision ) @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; + +/** + * Extended message faactory interface. + */ +public interface MessageFactoryEx extends MessageFactory { + /** + * Set kernal context. + * + * @param ctx Kernal context. + */ + void kernalContext(GridKernalContext ctx); +} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java (revision ) @@ -112,6 +112,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageConverter; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -134,6 +135,7 @@ import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.h2.api.ErrorCode; @@ -307,6 +309,9 @@ /** */ protected volatile GridKernalContext ctx; + /** Message converter. */ + private volatile GridH2ValueMessageConverter msgConverter; + /** Cache object value context. */ protected CacheQueryObjectValueContext valCtx; @@ -1754,6 +1759,11 @@ return prep instanceof Insert; } + /** {@inheritDoc} */ + @Override public MessageConverter messageConverter() { + return msgConverter; + } + /** * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #stmtCache}. */ @@ -1904,6 +1914,8 @@ } else { this.ctx = ctx; + + msgConverter = new GridH2ValueMessageConverter(ctx); schemas.put(QueryUtils.DFLT_SCHEMA, new H2Schema(QueryUtils.DFLT_SCHEMA)); Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java (revision ) @@ -287,7 +287,7 @@ GridResultPage page; try { - page = new GridResultPage(ctx, node.id(), msg) { + page = new GridResultPage(node.id(), msg) { @Override public void fetchNextPage() { Object errState = r.state(); \ No newline at end of file Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java (revision ) @@ -31,28 +31,35 @@ * H2 Cache object message. */ public class GridH2CacheObject extends GridH2ValueMessage { + /** Kernal context. */ + private GridKernalContext ctx; + /** */ private CacheObject obj; /** - * + * Constructor. */ - public GridH2CacheObject() { - // No-op. + public GridH2CacheObject(GridKernalContext ctx) { + this.ctx = ctx; } /** + * Constructor. + * + * @param ctx Kernal context. * @param v Value. * @throws IgniteCheckedException If failed. */ - public GridH2CacheObject(GridH2ValueCacheObject v) throws IgniteCheckedException { + public GridH2CacheObject(GridKernalContext ctx, GridH2ValueCacheObject v) throws IgniteCheckedException { + this.ctx = ctx; this.obj = v.getCacheObject(); obj.prepareMarshal(v.valueContext()); } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) throws IgniteCheckedException { + @Override public Value value() throws IgniteCheckedException { CacheObjectValueContext valCtx = ctx.query().objectContext(); obj.finishUnmarshal(valCtx, ctx.cache().context().deploy().globalLoader()); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java (revision ) @@ -39,6 +39,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.jetbrains.annotations.Nullable; @@ -295,4 +296,9 @@ * @return {@code True} if insert. */ public boolean isInsertStatement(PreparedStatement nativeStmt); + + /** + * @return Message converter. + */ + public MessageConverter messageConverter(); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java (revision ) @@ -20,7 +20,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -66,7 +66,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { try { return (Value)GEOMETRY_FROM_BYTES.invoke(null, new Object[]{b}); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java (revision ) @@ -279,7 +279,7 @@ if (nodeId == null) nodeId = F.first(sources); - addPage0(new GridResultPage(null, nodeId, null) { + addPage0(new GridResultPage(nodeId, null) { @Override public boolean isFail() { return true; } @@ -368,7 +368,7 @@ protected final GridResultPage createDummyLastPage(GridResultPage lastPage) { assert !lastPage.isDummyLast(); // It must be a real last page. - return new GridResultPage(ctx, lastPage.source(), null).setLast(true); + return new GridResultPage(lastPage.source(), null).setLast(true); } /** \ No newline at end of file Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java (revision ) @@ -19,10 +19,32 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Array; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Boolean; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Byte; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Bytes; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2CacheObject; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Date; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Decimal; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Double; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Float; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Geometry; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Integer; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2JavaObject; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Long; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Null; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Short; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2String; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Time; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Timestamp; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2Uuid; +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage; import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.SB; @@ -260,6 +282,78 @@ Value h2Val = desc.wrap(val, objType); return h2Val.convertTo(type).getObject(); + } + + /** + * @param ctx Kernal context. + * @param v Value. + * @return Message. + * @throws IgniteCheckedException If failed. + */ + public static GridH2ValueMessage toMessage(GridKernalContext ctx, Value v) throws IgniteCheckedException { + switch (v.getType()) { + case Value.NULL: + return GridH2Null.INSTANCE; + + case Value.BOOLEAN: + return new GridH2Boolean(v); + + case Value.BYTE: + return new GridH2Byte(v); + + case Value.SHORT: + return new GridH2Short(v); + + case Value.INT: + return new GridH2Integer(v); + + case Value.LONG: + return new GridH2Long(v); + + case Value.DECIMAL: + return new GridH2Decimal(v); + + case Value.DOUBLE: + return new GridH2Double(v); + + case Value.FLOAT: + return new GridH2Float(v); + + case Value.DATE: + return new GridH2Date(v); + + case Value.TIME: + return new GridH2Time(v); + + case Value.TIMESTAMP: + return new GridH2Timestamp(v); + + case Value.BYTES: + return new GridH2Bytes(v); + + case Value.STRING: + case Value.STRING_FIXED: + case Value.STRING_IGNORECASE: + return new GridH2String(v); + + case Value.ARRAY: + return new GridH2Array(ctx, v); + + case Value.JAVA_OBJECT: + if (v instanceof GridH2ValueCacheObject) + return new GridH2CacheObject(ctx, (GridH2ValueCacheObject)v); + + return new GridH2JavaObject(v); + + case Value.UUID: + return new GridH2Uuid(v); + + case Value.GEOMETRY: + return new GridH2Geometry(v); + + default: + throw new IllegalStateException("Unsupported H2 type: " + v.getType()); + } } /** Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageConverter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageConverter.java (revision ) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageConverter.java (revision ) @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; +import org.h2.value.Value; + +/** + * Message converter. + */ +public class GridH2ValueMessageConverter implements MessageConverter { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public GridH2ValueMessageConverter(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public GridH2ValueMessage convertOnWrite(Value val) { + try { + return H2Utils.toMessage(ctx, val); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to convert H2 value to message: " + val, e); + } + } + + /** {@inheritDoc} */ + @Override public Value convertOnRead(GridH2ValueMessage msg) { + try { + return msg.value(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to convert message to H2 value: " + msg, e); + } + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java (revision ) @@ -17,17 +17,20 @@ package org.apache.ignite.internal.direct.stream; -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + /** * Direct marshalling I/O stream. */ @@ -171,8 +174,10 @@ * @param col Collection. * @param itemType Component type. * @param writer Writer. + * @param converter Optional converter. */ - public void writeCollection(Collection col, MessageCollectionItemType itemType, MessageWriter writer); + public void writeCollection(Collection col, MessageCollectionItemType itemType, MessageWriter writer, + @Nullable MessageConverter converter); /** * @param map Map. @@ -303,6 +308,16 @@ * @return Collection. */ public > C readCollection(MessageCollectionItemType itemType, MessageReader reader); + + + /** + * @param itemType Item type. + * @param reader Reader. + * @param converter Optional converter. + * @return Collection. + */ + public > C readCollection(MessageCollectionItemType itemType, MessageReader reader, + @Nullable MessageConverter converter); /** * @param keyType Key type. Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java (revision ) @@ -20,7 +20,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -58,7 +58,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueDecimal.get(new BigDecimal(new BigInteger(b), scale)); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java (revision ) @@ -17,22 +17,25 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; -import java.util.Collection; -import java.util.Iterator; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.communication.MessageFactoryEx; import org.apache.ignite.internal.processors.cache.query.QueryTable; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.h2.value.Value; import org.jetbrains.annotations.Nullable; /** * H2 Value message factory. */ -public class GridH2ValueMessageFactory implements MessageFactory { +public class GridH2ValueMessageFactory implements MessageFactoryEx { + /** Kernal context. */ + private GridKernalContext ctx; + /** {@inheritDoc} */ + @Override public void kernalContext(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ @Nullable @Override public Message create(short type) { switch (type) { case -4: @@ -90,7 +93,7 @@ return new GridH2Geometry(); case -22: - return new GridH2CacheObject(); + return new GridH2CacheObject(ctx); case -30: return new GridH2IndexRangeRequest(); @@ -115,110 +118,5 @@ } return null; - } - - /** - * @param src Source values. - * @param dst Destination collection. - * @return Destination collection. - * @throws IgniteCheckedException If failed. - */ - public static Collection toMessages(Collection src, Collection dst) - throws IgniteCheckedException { - for (Value[] row : src) { - for (Value val : row) - dst.add(toMessage(val)); - } - - return dst; - } - - /** - * @param src Source iterator. - * @param dst Array to fill with values. - * @param ctx Kernal context. - * @return Filled array. - * @throws IgniteCheckedException If failed. - */ - public static Value[] fillArray(Iterator src, Value[] dst, GridKernalContext ctx) - throws IgniteCheckedException { - for (int i = 0; i < dst.length; i++) { - Message msg = src.next(); - - dst[i] = ((GridH2ValueMessage)msg).value(ctx); - } - - return dst; - } - - /** - * @param v Value. - * @return Message. - * @throws IgniteCheckedException If failed. - */ - public static GridH2ValueMessage toMessage(Value v) throws IgniteCheckedException { - switch (v.getType()) { - case Value.NULL: - return GridH2Null.INSTANCE; - - case Value.BOOLEAN: - return new GridH2Boolean(v); - - case Value.BYTE: - return new GridH2Byte(v); - - case Value.SHORT: - return new GridH2Short(v); - - case Value.INT: - return new GridH2Integer(v); - - case Value.LONG: - return new GridH2Long(v); - - case Value.DECIMAL: - return new GridH2Decimal(v); - - case Value.DOUBLE: - return new GridH2Double(v); - - case Value.FLOAT: - return new GridH2Float(v); - - case Value.DATE: - return new GridH2Date(v); - - case Value.TIME: - return new GridH2Time(v); - - case Value.TIMESTAMP: - return new GridH2Timestamp(v); - - case Value.BYTES: - return new GridH2Bytes(v); - - case Value.STRING: - case Value.STRING_FIXED: - case Value.STRING_IGNORECASE: - return new GridH2String(v); - - case Value.ARRAY: - return new GridH2Array(v); - - case Value.JAVA_OBJECT: - if (v instanceof GridH2ValueCacheObject) - return new GridH2CacheObject((GridH2ValueCacheObject)v); - - return new GridH2JavaObject(v); - - case Value.UUID: - return new GridH2Uuid(v); - - case Value.GEOMETRY: - return new GridH2Geometry(v); - - default: - throw new IllegalStateException("Unsupported H2 type: " + v.getType()); - } } } \ No newline at end of file Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -48,7 +48,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueShort.get(x); } Index: modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java (revision ) @@ -24,6 +24,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * Communication message reader. @@ -255,6 +256,17 @@ * @return Collection. */ public > C readCollection(String name, MessageCollectionItemType itemType); + + /** + * Reads collection. + * + * @param name Field name. + * @param itemType Collection item type. + * @param converter Converter. + * @return Collection. + */ + public > C readCollection(String name, MessageCollectionItemType itemType, + @Nullable MessageConverter converter); /** * Reads map. Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -50,7 +50,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueString.get(x); } Index: modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java (revision ) @@ -119,7 +119,7 @@ assertEquals(m.directType(), type); - GridIoMessageFactory msgFactory = new GridIoMessageFactory(null); + GridIoMessageFactory msgFactory = new GridIoMessageFactory(null, null); Message mx = msgFactory.create(type); Index: modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java (revision ) @@ -17,15 +17,6 @@ package org.apache.ignite.internal.direct.stream.v1; -import java.lang.reflect.Array; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.UUID; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -36,9 +27,21 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; import sun.nio.ch.DirectBuffer; +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; + /** * Direct marshalling I/O stream (version 1). */ @@ -540,8 +543,9 @@ } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public void writeCollection(Collection col, MessageCollectionItemType itemType, - MessageWriter writer) { + MessageWriter writer, @Nullable MessageConverter converter) { if (col != null) { if (it == null) { writeInt(col.size()); @@ -553,9 +557,13 @@ } while (it.hasNext() || cur != NULL) { - if (cur == NULL) + if (cur == NULL) { cur = it.next(); + if (converter != null) + cur = converter.convertOnWrite(cur); + } + write(itemType, cur, writer); if (!lastFinished) @@ -889,9 +897,14 @@ } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") @Override public > C readCollection(MessageCollectionItemType itemType, MessageReader reader) { + return readCollection(itemType, reader, null); + } + + @SuppressWarnings("unchecked") + @Override public > C readCollection(MessageCollectionItemType itemType, + MessageReader reader, @Nullable MessageConverter converter) { if (readSize == -1) { int size = readInt(); @@ -910,6 +923,9 @@ if (!lastFinished) return null; + + if (converter != null) + item = converter.convertOnRead(item); col.add(item); Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -48,7 +48,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueByte.get(x); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -51,7 +51,7 @@ /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueTime.fromNanos(nanos); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -50,7 +50,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueBytes.getNoCopy(b); } Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java (revision ) @@ -150,7 +150,9 @@ e0.writeTo(buf, writer); CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry(); - e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(new GridIoMessageFactory(null), (byte)1)); + + e1.readFrom(ByteBuffer.wrap(buf.array()), + new DirectMessageReader(new GridIoMessageFactory(null, null), (byte)1)); assertEquals(e0.cacheId(), e1.cacheId()); assertEquals(e0.eventType(), e1.eventType()); \ No newline at end of file Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -48,7 +48,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueDouble.get(x); } Index: modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java (revision ) @@ -17,12 +17,14 @@ package org.apache.ignite.plugin.extensions.communication; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Collection; import java.util.Map; import java.util.UUID; -import org.apache.ignite.lang.IgniteUuid; /** * Communication message writer. @@ -272,6 +274,18 @@ * @return Whether value was fully written. */ public boolean writeCollection(String name, Collection col, MessageCollectionItemType itemType); + + /** + * Writes collection. + * + * @param name Field name. + * @param col Collection. + * @param itemType Collection item type. + * @param itemConverter Item converter. + * @return Whether value was fully written. + */ + public boolean writeCollection(String name, Collection col, MessageCollectionItemType itemType, + @Nullable MessageConverter itemConverter); /** * Writes map. Index: modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java (revision ) @@ -17,38 +17,41 @@ package org.apache.ignite.internal.direct.stream.v2; -import java.lang.reflect.Array; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.RandomAccess; -import java.util.UUID; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; -import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageReader; + import org.apache.ignite.IgniteException; + import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; + import org.apache.ignite.internal.util.GridUnsafe; + import org.apache.ignite.internal.util.tostring.GridToStringExclude; + import org.apache.ignite.internal.util.typedef.internal.S; + import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.lang.IgniteUuid; + import org.apache.ignite.plugin.extensions.communication.Message; + import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; + import org.apache.ignite.plugin.extensions.communication.MessageFactory; + import org.apache.ignite.plugin.extensions.communication.MessageReader; + import org.apache.ignite.plugin.extensions.communication.MessageConverter; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; + import org.apache.ignite.plugin.extensions.communication.MessageWriter; + import org.jetbrains.annotations.Nullable; -import sun.nio.ch.DirectBuffer; + import sun.nio.ch.DirectBuffer; + import java.lang.reflect.Array; + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.BitSet; + import java.util.Collection; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.RandomAccess; + import java.util.UUID; + -import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN; -import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF; -import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF; -import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF; -import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF; -import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF; -import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF; -import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF; + import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN; + import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF; + import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF; + import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF; + import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF; + import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF; + import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF; + import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF; /** * Direct marshalling I/O stream (version 2). @@ -700,11 +703,12 @@ } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public void writeCollection(Collection col, MessageCollectionItemType itemType, - MessageWriter writer) { + MessageWriter writer, @Nullable MessageConverter converter) { if (col != null) { if (col instanceof List && col instanceof RandomAccess) - writeRandomAccessList((List)col, itemType, writer); + writeRandomAccessList((List)col, itemType, writer, converter); else { if (it == null) { writeInt(col.size()); @@ -716,9 +720,13 @@ } while (it.hasNext() || cur != NULL) { - if (cur == NULL) + if (cur == NULL) { cur = it.next(); + if (converter != null) + cur = converter.convertOnWrite(cur); + } + write(itemType, cur, writer); if (!lastFinished) @@ -738,8 +746,11 @@ * @param list List. * @param itemType Component type. * @param writer Writer. + * @param converter Optional message converter. */ - private void writeRandomAccessList(List list, MessageCollectionItemType itemType, MessageWriter writer) { + @SuppressWarnings("unchecked") + private void writeRandomAccessList(List list, MessageCollectionItemType itemType, MessageWriter writer, + @Nullable MessageConverter converter) { assert list instanceof RandomAccess; int size = list.size(); @@ -754,9 +765,13 @@ } while (arrPos < size || arrCur != NULL) { - if (arrCur == NULL) + if (arrCur == NULL) { arrCur = list.get(arrPos++); + if (converter != null) + arrCur = converter.convertOnWrite(arrCur); + } + write(itemType, arrCur, writer); if (!lastFinished) @@ -1224,9 +1239,15 @@ } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") @Override public > C readCollection(MessageCollectionItemType itemType, MessageReader reader) { + return readCollection(itemType, reader, null); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public > C readCollection(MessageCollectionItemType itemType, + MessageReader reader, @Nullable MessageConverter converter) { if (readSize == -1) { int size = readInt(); @@ -1245,6 +1266,9 @@ if (!lastFinished) return null; + + if (converter != null) + item = converter.convertOnRead(item); col.add(item); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java (revision ) @@ -103,6 +103,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.thread.IgniteThread; @@ -2467,6 +2468,13 @@ */ public CacheQueryObjectValueContext objectContext() { return valCtx; + } + + /** + * @return Message converter. + */ + public MessageConverter messageConverter() { + return idx.messageConverter(); } /** Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -48,7 +48,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueBoolean.get(x); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java (revision ) @@ -32,11 +32,10 @@ /** * Gets H2 value. * - * @param ctx Kernal context. * @return Value. * @throws IgniteCheckedException If failed. */ - public abstract Value value(GridKernalContext ctx) throws IgniteCheckedException; + public abstract Value value() throws IgniteCheckedException; /** {@inheritDoc} */ @Override public void onAckReceived() { Index: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/MessageEx.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/managers/communication/MessageEx.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/internal/managers/communication/MessageEx.java (revision ) @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Extended IO message interface. + */ +public interface MessageEx extends Message { + /** + * Set kernal context. + * + * @param ctx Kernal context. + */ + void kernalContext(GridKernalContext ctx); +} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java (revision ) @@ -23,15 +23,10 @@ import java.util.NoSuchElementException; import java.util.UUID; import javax.cache.CacheException; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.value.Value; -import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.fillArray; - /** * Page result. */ @@ -52,12 +47,13 @@ private boolean last; /** - * @param ctx Kernal context. + * Constructor. + * * @param src Source. * @param res Response. */ @SuppressWarnings("unchecked") - public GridResultPage(final GridKernalContext ctx, UUID src, GridQueryNextPageResponse res) { + public GridResultPage(UUID src, GridQueryNextPageResponse res) { assert src != null; this.src = src; @@ -65,47 +61,43 @@ // res == null means that it is a terminating dummy page for the given source node ID. if (res != null) { - Collection plainRows = res.plainRows(); - - if (plainRows != null) { - rowsInPage = plainRows.size(); - - rows = (Iterator)plainRows.iterator(); - } - else { - final int cols = res.columns(); + final int cols = res.columns(); - rowsInPage = res.values().size() / cols; + Collection vals = (Collection)res.values(); - final Iterator valsIter = res.values().iterator(); + rowsInPage = vals.size() / cols; + final Iterator valsIter = vals.iterator(); + - rows = new Iterator() { - /** */ - int rowIdx; + rows = new Iterator() { + /** */ + int rowIdx; - @Override public boolean hasNext() { - return rowIdx < rowsInPage; - } + @Override public boolean hasNext() { + return rowIdx < rowsInPage; + } - @Override public Value[] next() { - if (!hasNext()) - throw new NoSuchElementException(); + @Override public Value[] next() { + if (!hasNext()) + throw new NoSuchElementException(); - rowIdx++; + rowIdx++; - try { - return fillArray(valsIter, new Value[cols], ctx); + Value[] dst = new Value[cols]; + + for (int i = 0; i < dst.length; i++) { + Value val = valsIter.next(); + + dst[i] = val; - } + } - catch (IgniteCheckedException e) { - throw new CacheException(e); + + return dst; - } + } - } - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; - } } else { rowsInPage = 0; \ No newline at end of file Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -54,7 +54,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueTimestamp.fromDateValueAndNanos(date, nanos); } Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java (revision ) @@ -55,6 +55,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -361,6 +362,11 @@ /** {@inheritDoc} */ @Override public boolean isInsertStatement(PreparedStatement nativeStmt) { return false; + } + + /** {@inheritDoc} */ + @Override public MessageConverter messageConverter() { + return null; } } } Index: modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageConverter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageConverter.java (revision ) +++ modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageConverter.java (revision ) @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** + * Message converter. + */ +public interface MessageConverter { + /** + * Convert object on write (from real object to object for the stream). + * + * @param obj Object. + * @return Converted object. + */ + REPLACEMENT convertOnWrite(ORIGINAL obj); + + /** + * Convert object on read (from object for the stream to real object). + * + * @param obj Object. + * @return Converted object. + */ + ORIGINAL convertOnRead(REPLACEMENT obj); +} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java (revision ) @@ -69,7 +69,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.thread.IgniteThread; import org.h2.jdbc.JdbcResultSet; import org.h2.value.Value; @@ -84,7 +83,6 @@ import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED; -import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages; /** * Map query executor. @@ -829,11 +827,16 @@ try { boolean loc = node.isLocal(); - GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.queryRequestId(), segmentId, qry, page, + Collection vals = new ArrayList<>(rows.size() * res.columnCount()); + + for (Value[] row : rows) + Collections.addAll(vals, row); + + GridQueryNextPageResponse msg = new GridQueryNextPageResponse(ctx, qr.queryRequestId(), segmentId, qry, + page, page == 0 ? res.rowCount() : -1, res.columnCount(), - loc ? null : toMessages(rows, new ArrayList(res.columnCount())), - loc ? rows : null, + vals, last); if (loc) @@ -857,11 +860,9 @@ try { boolean loc = node.isLocal(); - GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId, + GridQueryNextPageResponse msg = new GridQueryNextPageResponse(ctx, reqId, segmentId, - /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1, + /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1, - loc ? null : Collections.emptyList(), - loc ? Collections.emptyList() : null, - false); + Collections.emptyList(), false); msg.retry(h2.readyTopologyVersion()); \ No newline at end of file Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -48,7 +48,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueLong.get(x); } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java (revision ) @@ -22,7 +22,9 @@ import java.util.Collection; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.managers.communication.MessageEx; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -34,10 +36,14 @@ * Next page response. */ @IgniteCodeGeneratingFail -public class GridQueryNextPageResponse implements Message { +public class GridQueryNextPageResponse implements MessageEx { /** */ private static final long serialVersionUID = 0L; + /** Kernal context. */ + @GridDirectTransient + private transient GridKernalContext ctx; + /** */ private long qryReqId; @@ -56,15 +62,11 @@ /** */ private int cols; - /** */ + /** Flat values. */ @GridDirectCollection(Message.class) - private Collection vals; + private transient Collection vals; /** */ - @GridDirectTransient - private transient Collection plainRows; - - /** */ private AffinityTopologyVersion retry; /** Last page flag. */ @@ -78,21 +80,24 @@ } /** + * Constructor. + * + * @param ctx Kernal context. * @param qryReqId Query request ID. * @param segmentId Index segment ID. * @param qry Query. * @param page Page. * @param allRows All rows count. * @param cols Number of columns in row. - * @param vals Values for rows in this page added sequentially. - * @param plainRows Not marshalled rows for local node. + * @param vals Values. * @param last Last page flag. */ - public GridQueryNextPageResponse(long qryReqId, int segmentId, int qry, int page, int allRows, int cols, - Collection vals, Collection plainRows, boolean last) { - assert vals != null ^ plainRows != null; + public GridQueryNextPageResponse(GridKernalContext ctx, long qryReqId, int segmentId, int qry, int page, + int allRows, int cols, Collection vals, boolean last) { + assert vals != null; assert cols > 0 : cols; + this.ctx = ctx; this.qryReqId = qryReqId; this.segmentId = segmentId; this.qry = qry; @@ -100,7 +105,6 @@ this.allRows = allRows; this.cols = cols; this.vals = vals; - this.plainRows = plainRows; this.last = last; } @@ -149,15 +153,13 @@ /** * @return Values. */ - public Collection values() { + public Collection values() { return vals; } - /** - * @return Plain rows. - */ - public Collection plainRows() { - return plainRows; + /** {@inheritDoc} */ + @Override public void kernalContext(GridKernalContext ctx) { + this.ctx = ctx; } /** {@inheritDoc} */ @@ -208,7 +210,8 @@ writer.incrementState(); case 5: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG, + ctx.query().messageConverter())) return false; writer.incrementState(); @@ -284,7 +287,7 @@ reader.incrementState(); case 5: - vals = reader.readCollection("vals", MessageCollectionItemType.MSG); + vals = reader.readCollection("vals", MessageCollectionItemType.MSG, ctx.query().messageConverter()); if (!reader.isLastRead()) return false; @@ -359,8 +362,6 @@ /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridQueryNextPageResponse.class, this, - "valsSize", vals != null ? vals.size() : 0, - "rowsSize", plainRows != null ? plainRows.size() : 0); + return S.toString(GridQueryNextPageResponse.class, this, "valsSize", vals.size()); } } \ No newline at end of file Index: modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java (revision ) @@ -570,7 +570,7 @@ /** {@inheritDoc} */ @Override public MessageFactory messageFactory() { if (factory == null) - factory = new GridIoMessageFactory(null); + factory = new GridIoMessageFactory(null, null); return factory; } Index: modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java (revision ) @@ -17,11 +17,6 @@ package org.apache.ignite.internal.direct; -import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; @@ -33,9 +28,16 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageConverter; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + /** * Message writer implementation. */ @@ -283,9 +285,15 @@ /** {@inheritDoc} */ @Override public boolean writeCollection(String name, Collection col, MessageCollectionItemType itemType) { + return writeCollection(name, col, itemType, null); + } + + /** {@inheritDoc} */ + @Override public boolean writeCollection(String name, Collection col, MessageCollectionItemType itemType, + @Nullable MessageConverter itemConverter) { DirectByteBufferStream stream = state.item().stream; - stream.writeCollection(col, itemType, this); + stream.writeCollection(col, itemType, this, itemConverter); return stream.lastFinished(); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -48,7 +48,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueInt.get(x); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -54,7 +54,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueUuid.get(high, low); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java (revision ) @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg; import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.h2.value.Value; @@ -48,7 +48,7 @@ } /** {@inheritDoc} */ - @Override public Value value(GridKernalContext ctx) { + @Override public Value value() { return ValueFloat.get(x); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java (revision ) @@ -17,12 +17,6 @@ package org.apache.ignite.internal.processors.query.h2.opt; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; @@ -34,15 +28,16 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage; -import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.IgniteTree; +import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.CIX2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -68,6 +63,23 @@ import org.h2.value.ValueNull; import org.jetbrains.annotations.Nullable; +import javax.cache.CacheException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import static java.util.Collections.emptyIterator; import static java.util.Collections.singletonList; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY; @@ -675,7 +687,7 @@ for (int i = 0; i < cols; i++) { try { - vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i))); + vals.add(H2Utils.toMessage(ctx.kernalContext(), row.getValue(i))); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -710,7 +722,7 @@ continue; try { - vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx); + vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -737,7 +749,7 @@ break; try { - vals.add(GridH2ValueMessageFactory.toMessage(val)); + vals.add(H2Utils.toMessage(ctx.kernalContext(), val)); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -784,7 +796,7 @@ for (int i = 0; i < vals0.length; i++) { try { - vals0[i] = vals.get(i).value(ctx); + vals0[i] = vals.get(i).value(); } catch (IgniteCheckedException e) { throw new CacheException(e); Index: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java (revision b7d198ea3dc70ec1ceb28cb9def26c315ea5d365) +++ modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java (revision ) @@ -24,6 +24,7 @@ import org.apache.ignite.internal.GridJobExecuteResponse; import org.apache.ignite.internal.GridJobSiblingsRequest; import org.apache.ignite.internal.GridJobSiblingsResponse; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTaskCancelRequest; import org.apache.ignite.internal.GridTaskSessionRequest; import org.apache.ignite.internal.IgniteDiagnosticMessage; @@ -163,13 +164,17 @@ /** Custom messages registry. Used for test purposes. */ private static final Map> CUSTOM = new ConcurrentHashMap8<>(); + /** Kernal context. */ + private final GridKernalContext ctx; + /** Extensions. */ private final MessageFactory[] ext; /** * @param ext Extensions. */ - public GridIoMessageFactory(MessageFactory[] ext) { + public GridIoMessageFactory(GridKernalContext ctx, MessageFactory[] ext) { + this.ctx = ctx; this.ext = ext; } @@ -900,6 +905,9 @@ if (msg == null) throw new IgniteException("Invalid message type: " + type); + + if (msg instanceof MessageEx) + ((MessageEx)msg).kernalContext(ctx); return msg; }