Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java (revision bd67b3181988aacd4e59d941aa82d5598a0e0d00) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java (revision 94185d8e368af201f52ca37a310b04e3ef014caa) @@ -29,6 +29,8 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -726,6 +728,11 @@ return cur != null ? cur.getAll() : Collections.>emptyList(); } + @Override + public Spliterator> spliterator() { + return cur != null ? cur.spliterator() : Spliterators.emptySpliterator(); + } + @Override public void close() { if (cur != null) cur.close(); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java (revision bd67b3181988aacd4e59d941aa82d5598a0e0d00) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java (revision 94185d8e368af201f52ca37a310b04e3ef014caa) @@ -21,12 +21,14 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Spliterator; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCancelledException; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; @@ -96,6 +98,10 @@ return new AutoClosableCursorIterator<>(this, iter()); } + @Override public Spliterator spliterator() { + return iterExec == null ? QueryCursorEx.super.spliterator() : iterExec.spliterator(); + } + /** * @return An simple iterator. */ Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryKeyValueIterable.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryKeyValueIterable.java (revision bd67b3181988aacd4e59d941aa82d5598a0e0d00) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryKeyValueIterable.java (revision 94185d8e368af201f52ca37a310b04e3ef014caa) @@ -22,6 +22,7 @@ import javax.cache.Cache; import java.util.Iterator; import java.util.List; +import java.util.Spliterator; /** * SqlQuery key-value iterable. @@ -44,6 +45,11 @@ return new QueryKeyValueIterator<>(cur.iterator()); } + @Override + public Spliterator> spliterator() { + return new QueryKeyValueSpliterator<>(cur.spliterator()); + } + /** * @return Underlying fields query cursor. */ Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryKeyValueSpliterator.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryKeyValueSpliterator.java (revision 94185d8e368af201f52ca37a310b04e3ef014caa) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryKeyValueSpliterator.java (revision 94185d8e368af201f52ca37a310b04e3ef014caa) @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; + +import javax.cache.Cache; +import javax.cache.CacheException; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.function.Consumer; + +/** + * SqlQuery key-value spliterator. + */ +public class QueryKeyValueSpliterator implements Spliterator> { + /** Target spliterator. */ + private final Spliterator> spliterator; + + /** + * Constructor. + * + * @param spliterator Target spliterator. + */ + public QueryKeyValueSpliterator(Spliterator> spliterator) { + this.spliterator = spliterator; + } + + @Override + public boolean tryAdvance(Consumer> action) { + return spliterator.tryAdvance(new Consumer>() { + @Override + public void accept(List row) { + action.accept(new CacheEntryImpl<>((K)row.get(0), (V)row.get(1))); + } + }); + } + + @Override + public Spliterator> trySplit() { + return null; + } + + @Override + public long estimateSize() { + return spliterator.estimateSize(); + } + + @Override + public int characteristics() { + return spliterator.characteristics(); + } +} Index: modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java (revision bd67b3181988aacd4e59d941aa82d5598a0e0d00) +++ modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java (revision 94185d8e368af201f52ca37a310b04e3ef014caa) @@ -17,24 +17,31 @@ package org.apache.ignite.internal; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.P2; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; @@ -305,6 +312,34 @@ } } + @Test + public void testCacheContinuousQuerySpliteratorMultipleCalls() { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + IgniteCache clientCache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate() { + @Override public boolean apply(Object key, Object val) { + return key != null; + } + })); + qry.setAutoUnsubscribe(true); + + qry.setLocalListener(lsnr); + + try(QueryCursor cur = clientCache.query(qry)) { + cur.iterator(); + cur.spliterator(); + GridTestUtils.assertThrows(log, IgniteException.class, "Iterator is already fetched or query was cancelled.", cur, "iterator"); + } + } + /** * @param client Client. * @param clientCache Client cache.