From 9a851a58ad5eb43f5b868d4a72013ce4a9beb395 Mon Sep 17 00:00:00 2001 From: rkondakov Date: Sat, 9 Dec 2017 20:39:26 +0300 Subject: [PATCH 01/10] IGNITE-7039: Partitions reservation for the local queries is implemented. --- .../query/h2/DmlStatementsProcessor.java | 5 +- .../processors/query/h2/IgniteH2Indexing.java | 268 +++++++++++++++++- ...vation.java => ReplicatedReservation.java} | 6 +- ...eservationKey.java => ReservationKey.java} | 8 +- .../h2/twostep/GridMapQueryExecutor.java | 172 +---------- ...IgniteCacheLocalQueryReservationsTest.java | 223 +++++++++++++++ .../h2/GridIndexingSpiAbstractSelfTest.java | 4 +- 7 files changed, 495 insertions(+), 191 deletions(-) rename modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/{twostep/MapReplicatedReservation.java => ReplicatedReservation.java} (85%) rename modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/{twostep/MapReservationKey.java => ReservationKey.java} (89%) create mode 100644 modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index c3d48dd1387..cf1cf268dbf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -295,7 +295,7 @@ long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Obje final ArrayList> data = new ArrayList<>(plan.rowsNum); final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQry, - F.asList(args), null, false, 0, null); + F.asList(args), null, false, 0, null, null); QueryCursorImpl> stepCur = new QueryCursorImpl<>(new Iterable>() { @Override public Iterator> iterator() { @@ -401,7 +401,8 @@ private UpdateResult executeUpdateStatement(String schemaName, final GridCacheCo } else { final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry, - F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); + F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel, + fieldsQry.getPartitions()); cur = new QueryCursorImpl<>(new Iterable>() { @Override public Iterator> iterator() { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index f3a95a56a42..9cde766ea8b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -29,6 +29,7 @@ import java.sql.Statement; import java.sql.Types; import java.text.MessageFormat; +import java.util.AbstractCollection; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -38,6 +39,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -67,6 +69,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; @@ -125,6 +130,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; @@ -138,18 +144,21 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl; import org.h2.api.ErrorCode; import org.h2.api.JavaObjectSerializer; import org.h2.command.Prepared; import org.h2.command.dml.Insert; +import org.h2.command.dml.Select; import org.h2.engine.Session; import org.h2.engine.SysProperties; import org.h2.index.Index; import org.h2.jdbc.JdbcStatement; import org.h2.server.web.WebServer; import org.h2.table.IndexColumn; +import org.h2.table.Table; import org.h2.tools.Server; import org.h2.util.JdbcUtils; import org.jetbrains.annotations.Nullable; @@ -161,6 +170,8 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.IgniteSystemProperties.getString; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; @@ -332,6 +343,9 @@ private final GridBoundedConcurrentLinkedHashMap twoStepCache = new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE); + /** */ + private final ConcurrentMap reservations = new ConcurrentHashMap8<>(); + /** */ private final IgniteInClosure> logger = new IgniteInClosure>() { @Override public void apply(IgniteInternalFuture fut) { @@ -840,14 +854,14 @@ public GridH2IndexBase createSortedIndex(String name, GridH2Table tbl, boolean p @SuppressWarnings("unchecked") public GridQueryFieldsResult queryLocalSqlFields(final String schemaName, final String qry, @Nullable final Collection params, final IndexingQueryFilter filter, boolean enforceJoinOrder, - final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException { + final int timeout, final GridQueryCancel cancel, final int[] parts) throws IgniteCheckedException { final Connection conn = connectionForSchema(schemaName); H2Utils.setupConnection(conn, false, enforceJoinOrder); final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true); - Prepared p = GridSqlQueryParser.prepared(stmt); + final Prepared p = GridSqlQueryParser.prepared(stmt); if (DmlStatementsProcessor.isDmlStatement(p)) { SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry); @@ -873,20 +887,32 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) throw new IgniteCheckedException("Cannot prepare query metadata", e); } - final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) - .filter(filter).distributedJoinMode(OFF); - return new GridQueryFieldsResultAdapter(meta, null) { @Override public GridCloseableIterator> iterator() throws IgniteCheckedException { assert GridH2QueryContext.get() == null; - GridH2QueryContext.set(ctx); - GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS, schemaName, U.currentTimeMillis(), cancel, true); runs.putIfAbsent(run.id(), run); + List cacheIds = collectCacheIds(p); + + AffinityTopologyVersion topVer = readyTopologyVersion(); + + List reserved = new ArrayList<>(); + + if (!reservePartitions(cacheIds, topVer, parts, reserved)) + throw new IgniteCheckedException("Failed to reserve partitions for [cacheIds=" + cacheIds + + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); + + GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) + .filter(filter) + .distributedJoinMode(OFF) + .reservations(reserved); + + GridH2QueryContext.set(ctx); + try { ResultSet rs = executeSqlQueryWithTimer(stmt, conn, qry, params, timeout, cancel); @@ -895,6 +921,8 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) finally { GridH2QueryContext.clearThreadLocal(); + ctx.clearContext(false); + runs.remove(run.id()); } } @@ -1103,7 +1131,7 @@ public void bindParameters(PreparedStatement stmt, Object[] args = qry.getArgs(); final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, F.asList(args), filter, - qry.isEnforceJoinOrder(), qry.getTimeout(), cancel); + qry.isEnforceJoinOrder(), qry.getTimeout(), cancel, qry.getPartitions()); QueryCursorImpl> cursor = new QueryCursorImpl<>(new Iterable>() { @Override public Iterator> iterator() { @@ -1191,8 +1219,43 @@ public void bindParameters(PreparedStatement stmt, H2Utils.setupConnection(conn, false, false); - GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter) - .distributedJoinMode(OFF)); + List cacheIds = Collections.singletonList(tbl.cache().cacheId()); + + AffinityTopologyVersion topVer = readyTopologyVersion(); + + List reserved = new ArrayList<>(); + + int[] parts = null; + + if (filter != null) { + String name = tbl.cache().name(); + + IndexingQueryCacheFilter cacheFilter = filter.forCache(name); + + if (cacheFilter != null) { + int partitions = tbl.cache().topology().partitions(); + + List filteredParts = new ArrayList<>(partitions); + + for (int i = 0; i < partitions; i++) { + if (cacheFilter.applyPartition(i)) + filteredParts.add(i); + } + + parts = U.toIntArray(filteredParts); + } + } + + if (!reservePartitions(cacheIds, topVer, parts, reserved)) + throw new IgniteCheckedException("Failed to reserve partitions for [cacheIds=" + cacheIds + + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); + + GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) + .filter(filter) + .distributedJoinMode(OFF) + .reservations(reserved); + + GridH2QueryContext.set(ctx); GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, schemaName, U.currentTimeMillis(), null, true); @@ -1207,6 +1270,8 @@ public void bindParameters(PreparedStatement stmt, finally { GridH2QueryContext.clearThreadLocal(); + ctx.clearContext(false); + runs.remove(run.id()); } } @@ -2369,7 +2434,7 @@ private boolean isDefaultSchema(String schemaName) { H2Schema schema = schemas.get(schemaName); if (schema != null) { - mapQryExec.onCacheStop(cacheName); + onCacheStop(cacheName); dmlProc.onCacheStop(cacheName); // Remove this mapping only after callback to DML proc - it needs that mapping internally @@ -2574,6 +2639,30 @@ private int bindPartitionInfoParameter(CacheQueryPartitionInfo partInfo, Object[ U.close(conn, log); } + /** + * Extracts cache identifiers from {@link Prepared}. + * + * @param p Prepared statement. + * @return Relevant cache identifiers for the given prepared statement. + */ + public List collectCacheIds(Prepared p) { + List cacheIds = new ArrayList<>(); + + if (p != null && p instanceof Select) { + Select select = (Select) p; + + Set tbls = select.getTables(); + + for (Table tbl : tbls) { + if (tbl instanceof GridH2Table) + cacheIds.add(((GridH2Table)tbl).cacheId()); + } + } + + return cacheIds; + } + + /** * Collect cache identifiers from two-step query. * @@ -2611,6 +2700,163 @@ private int bindPartitionInfoParameter(CacheQueryPartitionInfo partInfo, Object[ } } + /** + * @param cacheIds Cache IDs. + * @param topVer Topology version. + * @param explicitParts Explicit partitions list. + * @param reserved Reserved list. + * @return {@code true} If all the needed partitions successfully reserved. + * @throws IgniteCheckedException If failed. + */ + public boolean reservePartitions( + @Nullable List cacheIds, + AffinityTopologyVersion topVer, + final int[] explicitParts, + List reserved + ) throws IgniteCheckedException { + assert topVer != null; + + if (F.isEmpty(cacheIds)) + return true; + + Collection partIds = wrap(explicitParts); + + for (int i = 0; i < cacheIds.size(); i++) { + GridCacheContext cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); + + if (cctx == null) // Cache was not found, probably was not deployed yet. + return false; + + if (cctx.isLocal() || !cctx.rebalanceEnabled()) + continue; + + // For replicated cache topology version does not make sense. + final ReservationKey grpKey = new ReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); + + GridReservable r = reservations.get(grpKey); + + if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. + if (r != ReplicatedReservation.INSTANCE) { + if (!r.reserve()) + return false; // We need explicit partitions here -> retry. + + reserved.add(r); + } + } + else { // Try to reserve partitions one by one. + int partsCnt = cctx.affinity().partitions(); + + if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. + if (r == null) { // Check only once. + for (int p = 0; p < partsCnt; p++) { + GridDhtLocalPartition part = partition(cctx, p); + + // We don't need to reserve partitions because they will not be evicted in replicated caches. + if (part == null || part.state() != OWNING) + return false; + } + + // Mark that we checked this replicated cache. + reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE); + } + } + else { // Reserve primary partitions for partitioned cache (if no explicit given). + if (explicitParts == null) + partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); + + for (int partId : partIds) { + GridDhtLocalPartition part = partition(cctx, partId); + + if (part == null || part.state() != OWNING || !part.reserve()) + return false; + + reserved.add(part); + + // Double check that we are still in owning state and partition contents are not cleared. + if (part.state() != OWNING) + return false; + } + + if (explicitParts == null) { + // We reserved all the primary partitions for cache, attempt to add group reservation. + GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); + + if (grp.register(reserved.subList(reserved.size() - partIds.size(), reserved.size()))) { + if (reservations.putIfAbsent(grpKey, grp) != null) + throw new IllegalStateException("Reservation already exists."); + + grp.onPublish(new CI1() { + @Override public void apply(GridDhtPartitionsReservation r) { + reservations.remove(grpKey, r); + } + }); + } + } + } + } + } + + return true; + } + + /** + * @param cctx Cache context. + * @param p Partition ID. + * @return Partition. + */ + private GridDhtLocalPartition partition(GridCacheContext cctx, int p) { + return cctx.topology().localPartition(p, NONE, false); + } + + /** + * @param ints Integers. + * @return Collection wrapper. + */ + private static Collection wrap(final int[] ints) { + if (ints == null) + return null; + + if (ints.length == 0) + return Collections.emptySet(); + + return new AbstractCollection() { + @SuppressWarnings("NullableProblems") + @Override public Iterator iterator() { + return new Iterator() { + /** */ + private int i = 0; + + @Override public boolean hasNext() { + return i < ints.length; + } + + @Override public Integer next() { + return ints[i++]; + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override public int size() { + return ints.length; + } + }; + } + + /** + * @param cacheName Cache name. + */ + public void onCacheStop(String cacheName) { + // Drop group reservations. + for (ReservationKey grpKey : reservations.keySet()) { + if (F.eq(grpKey.cacheName(), cacheName)) + reservations.remove(grpKey); + } + } + /** * Closeable iterator. */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReplicatedReservation.java similarity index 85% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReplicatedReservation.java index dd8237b6f94..50793c6a302 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReplicatedReservation.java @@ -15,16 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.twostep; +package org.apache.ignite.internal.processors.query.h2; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; /** * Mapper fake reservation object for replicated caches. */ -class MapReplicatedReservation implements GridReservable { +public class ReplicatedReservation implements GridReservable { /** */ - static final MapReplicatedReservation INSTANCE = new MapReplicatedReservation(); + static final ReplicatedReservation INSTANCE = new ReplicatedReservation(); /** {@inheritDoc} */ @Override public boolean reserve() { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReservationKey.java similarity index 89% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReservationKey.java index 9d2d7ba6968..7cbe37c0ef1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReservationKey.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.twostep; +package org.apache.ignite.internal.processors.query.h2; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; @@ -23,7 +23,7 @@ /** * Mapper reservation key. */ -public class MapReservationKey { +public class ReservationKey { /** Cache name. */ private final String cacheName; @@ -36,7 +36,7 @@ * @param cacheName Cache name. * @param topVer Topology version. */ - public MapReservationKey(String cacheName, AffinityTopologyVersion topVer) { + public ReservationKey(String cacheName, AffinityTopologyVersion topVer) { this.cacheName = cacheName; this.topVer = topVer; } @@ -56,7 +56,7 @@ public String cacheName() { if (o == null || getClass() != o.getClass()) return false; - MapReservationKey other = (MapReservationKey)o; + ReservationKey other = (ReservationKey)o; return F.eq(cacheName, other.cacheName) && F.eq(topVer, other.topVer); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 77b928f062e..5d7e480e9e0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -19,12 +19,10 @@ import java.sql.Connection; import java.sql.ResultSet; -import java.util.AbstractCollection; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -51,8 +49,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; @@ -72,22 +68,18 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.thread.IgniteThread; import org.h2.jdbc.JdbcResultSet; import org.h2.value.Value; -import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL; -import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; @@ -114,9 +106,6 @@ /** */ private final GridSpinBusyLock busyLock; - /** */ - private final ConcurrentMap reservations = new ConcurrentHashMap8<>(); - /** Lazy workers. */ private final ConcurrentHashMap lazyWorkers = new ConcurrentHashMap<>(); @@ -268,151 +257,7 @@ private MapNodeResults resultsForNode(UUID nodeId) { return nodeRess; } - /** - * @param cctx Cache context. - * @param p Partition ID. - * @return Partition. - */ - private GridDhtLocalPartition partition(GridCacheContext cctx, int p) { - return cctx.topology().localPartition(p, NONE, false); - } - - /** - * @param cacheIds Cache IDs. - * @param topVer Topology version. - * @param explicitParts Explicit partitions list. - * @param reserved Reserved list. - * @return {@code true} If all the needed partitions successfully reserved. - * @throws IgniteCheckedException If failed. - */ - private boolean reservePartitions( - @Nullable List cacheIds, - AffinityTopologyVersion topVer, - final int[] explicitParts, - List reserved - ) throws IgniteCheckedException { - assert topVer != null; - - if (F.isEmpty(cacheIds)) - return true; - - Collection partIds = wrap(explicitParts); - - for (int i = 0; i < cacheIds.size(); i++) { - GridCacheContext cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); - - if (cctx == null) // Cache was not found, probably was not deployed yet. - return false; - - if (cctx.isLocal() || !cctx.rebalanceEnabled()) - continue; - - // For replicated cache topology version does not make sense. - final MapReservationKey grpKey = new MapReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); - - GridReservable r = reservations.get(grpKey); - - if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. - if (r != MapReplicatedReservation.INSTANCE) { - if (!r.reserve()) - return false; // We need explicit partitions here -> retry. - - reserved.add(r); - } - } - else { // Try to reserve partitions one by one. - int partsCnt = cctx.affinity().partitions(); - - if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. - if (r == null) { // Check only once. - for (int p = 0; p < partsCnt; p++) { - GridDhtLocalPartition part = partition(cctx, p); - - // We don't need to reserve partitions because they will not be evicted in replicated caches. - if (part == null || part.state() != OWNING) - return false; - } - - // Mark that we checked this replicated cache. - reservations.putIfAbsent(grpKey, MapReplicatedReservation.INSTANCE); - } - } - else { // Reserve primary partitions for partitioned cache (if no explicit given). - if (explicitParts == null) - partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); - - for (int partId : partIds) { - GridDhtLocalPartition part = partition(cctx, partId); - - if (part == null || part.state() != OWNING || !part.reserve()) - return false; - - reserved.add(part); - - // Double check that we are still in owning state and partition contents are not cleared. - if (part.state() != OWNING) - return false; - } - - if (explicitParts == null) { - // We reserved all the primary partitions for cache, attempt to add group reservation. - GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); - - if (grp.register(reserved.subList(reserved.size() - partIds.size(), reserved.size()))) { - if (reservations.putIfAbsent(grpKey, grp) != null) - throw new IllegalStateException("Reservation already exists."); - - grp.onPublish(new CI1() { - @Override public void apply(GridDhtPartitionsReservation r) { - reservations.remove(grpKey, r); - } - }); - } - } - } - } - } - - return true; - } - - /** - * @param ints Integers. - * @return Collection wrapper. - */ - private static Collection wrap(final int[] ints) { - if (ints == null) - return null; - - if (ints.length == 0) - return Collections.emptySet(); - - return new AbstractCollection() { - @SuppressWarnings("NullableProblems") - @Override public Iterator iterator() { - return new Iterator() { - /** */ - private int i = 0; - - @Override public boolean hasNext() { - return i < ints.length; - } - - @Override public Integer next() { - return ints[i++]; - } - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override public int size() { - return ints.length; - } - }; - } /** * @param caches Cache IDs. @@ -609,7 +454,7 @@ private void onQueryRequest0( try { if (topVer != null) { // Reserve primary for topology version or explicit partitions. - if (!reservePartitions(cacheIds, topVer, parts, reserved)) { + if (!h2.reservePartitions(cacheIds, topVer, parts, reserved)) { // Unregister lazy worker because re-try may never reach this node again. if (lazy) stopAndUnregisterCurrentLazyWorker(); @@ -758,7 +603,7 @@ private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) th List reserved = new ArrayList<>(); - if (!reservePartitions(cacheIds, topVer, parts, reserved)) { + if (!h2.reservePartitions(cacheIds, topVer, parts, reserved)) { U.error(log, "Failed to reserve partitions for DML request. [localNodeId=" + ctx.localNodeId() + ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", cacheIds=" + cacheIds + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); @@ -1011,17 +856,6 @@ private void sendRetry(ClusterNode node, long reqId, int segmentId) { } } - /** - * @param cacheName Cache name. - */ - public void onCacheStop(String cacheName) { - // Drop group reservations. - for (MapReservationKey grpKey : reservations.keySet()) { - if (F.eq(grpKey.cacheName(), cacheName)) - reservations.remove(grpKey); - } - } - /** * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread). */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java new file mode 100644 index 00000000000..03bb88253b5 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.List; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test for local query partitions reservation. + */ +public class IgniteCacheLocalQueryReservationsTest extends GridCommonAbstractTest { + + /** Cache name */ + private static final String PERSON_CACHE = "person"; + + /** + * Checks if a query reservation prevents cache partitions from the rebalancing. + * + * @param qry Query for entries counting. + * @throws Exception If failed. + */ + public void testReservations() throws Exception { + startGrid(0); + + int cnt = createAndFillCache(); + + System.out.println("cnt=" + cnt); + + final Query qry; + + qry = new SqlFieldsQuery("select count(*) from person where age >= 0 or age < 1000000000"); + + qry.setLocal(true); + + final IgniteCache cache = grid(0).cache(PERSON_CACHE); + + + final long start = System.currentTimeMillis(); + + List> res; /* = cache.query(qry).getAll(); + + System.out.println((System.currentTimeMillis() - start) + " res before1=" + res.get(0).get(0));*/ + + startGrid(1); + + /*final long duration = 10000; + + + Runnable r = new Runnable() { + @Override public void run() { + int i = 0; + + while (System.currentTimeMillis() < start + duration) { + List> r = cache.query(qry).getAll(); + System.out.println((System.currentTimeMillis() - start) + " res in cycle" + i + "=" + r.get(0).get(0)); + //doSleep(1); + } + } + }; + + Thread t1 = new Thread(r); + Thread t2 = new Thread(r); + + t1.start(); + doSleep(200); + t2.start(); + + t1.join(); + t2.join();*/ + + doSleep(1000); + res = cache.query(qry).getAll(); + + System.out.println((System.currentTimeMillis() - start) + " res after second started and small sleep=" + res.get(0).get(0)); + + + + doSleep(3000); + + res = cache.query(qry).getAll(); + + System.out.println("res after sleep=" + res.get(0).get(0)); + + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Creates and fills cache. + * + * @return Number of filled cache entities. + */ + private int createAndFillCache() { + CacheConfiguration cacheConf = new CacheConfiguration<>(PERSON_CACHE); + + cacheConf.setCacheMode(CacheMode.PARTITIONED) + .setBackups(0) + .setIndexedTypes(Integer.class, Person.class) + .setName(PERSON_CACHE); + + IgniteCache cache = grid(0).createCache(cacheConf); + + Affinity aff = grid(0).affinity(PERSON_CACHE); + + return fillAllPartitions(cache, aff); + } + + /** + * Fills each partition in the cache with a single data entry. + * + * @param cache - Cache to fill all partition to. + * @param aff Affinity. + * @return Number of filled partitions + */ + private int fillAllPartitions(IgniteCache cache, Affinity aff) { + /*int partsCnt = aff.partitions(); + + Set emptyParts = new HashSet<>(partsCnt); + + for (int i = 0; i < partsCnt; i++) + emptyParts.add(i); + + int cntr = 0; + + while (!emptyParts.isEmpty()) { + int part = aff.partition(cntr++); + + if (emptyParts.remove(part)) + cache.put(cntr, new Person("p_"+ cntr, cntr)); + + if (cntr > 100_000) + fail("Failed to fill all partitions"); + }*/ + + int partsCnt = 1_000_000; + + IgniteDataStreamer streamer = grid(0).dataStreamer(PERSON_CACHE); + + for (int i = 0; i < partsCnt; i++) + streamer.addData(i, new Person("p_"+ i, i)); + + streamer.flush(); + + return partsCnt; + } + + /** + * + */ + public static class Person implements Serializable { + /** Name. */ + @QuerySqlField + private String name; + + /** Age. */ + @QuerySqlField + private int age; + + /** + * @param name Name. + * @param age Age. + */ + public Person(String name, int age) { + this.name = name; + this.age = age; + } + + /** + * + */ + public String getName() { + return name; + } + + /** + * @param name Name. + */ + public void setName(String name) { + this.name = name; + } + + /** + * + */ + public int getAge() { + return age; + } + + /** + * @param age Age. + */ + public void setAge(int age) { + this.age = age; + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index 62860c0d16f..c0b5d7bdbd7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -343,7 +343,7 @@ public void testSpi() throws Exception { // Fields query GridQueryFieldsResult fieldsRes = spi.queryLocalSqlFields(spi.schema("A"), "select a.a.name n1, a.a.age a1, b.a.name n2, " + - "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null); + "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null, null); String[] aliases = {"N1", "A1", "N2", "A2"}; Object[] vals = { "Valera", 19, "Kolya", 25}; @@ -401,7 +401,7 @@ public void testLongQueries() throws Exception { range *= 3; GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.asList(1, - range), null, false, 0, null); + range), null, false, 0, null, null); assert res.iterator().hasNext(); From dac1fc52cef92eae848966e85218da79092a48fa Mon Sep 17 00:00:00 2001 From: rkondakov Date: Mon, 11 Dec 2017 12:52:04 +0300 Subject: [PATCH 02/10] IGNITE-7039: Tests implemented. --- ...IgniteCacheLocalQueryReservationsTest.java | 176 ++++++++++++------ 1 file changed, 114 insertions(+), 62 deletions(-) diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java index 03bb88253b5..9d90e300789 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java @@ -18,15 +18,27 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -37,74 +49,133 @@ /** Cache name */ private static final String PERSON_CACHE = "person"; + /** Timeout */ + private static final long TIMEOUT_MS = 30_000L; + + /** Cache size. */ + private static final int CACHE_SIZE = 2_000_000; + /** - * Checks if a query reservation prevents cache partitions from the rebalancing. + * Test for the local {@link SqlFieldsQuery} reservations. * - * @param qry Query for entries counting. * @throws Exception If failed. */ - public void testReservations() throws Exception { - startGrid(0); + public void testLocalSqlFieldsQueryReservations() throws Exception { + Query qry = new SqlFieldsQuery("select count(*) from person where age >= 0 or age < 1000000000"); - int cnt = createAndFillCache(); + checkReservations(qry); + } - System.out.println("cnt=" + cnt); + /** + * Test for the local {@link SqlQuery} reservations. + * + * @throws Exception If failed. + */ + public void testLocalSqlQueryReservations() throws Exception { + String sql = "age >= 0 or age < 1000000000"; - final Query qry; + SqlQuery qry = new SqlQuery<>(Person.class, sql); + + checkReservations(qry); + } + + /** + * Checks if a query partitions reservations occurs. + * + * @param qry Query for entries counting. + * @throws Exception If failed. + */ + private void checkReservations(final Query qry) throws Exception { + final IgniteEx grid = startGrid(0); - qry = new SqlFieldsQuery("select count(*) from person where age >= 0 or age < 1000000000"); + createAndFillCache(); qry.setLocal(true); final IgniteCache cache = grid(0).cache(PERSON_CACHE); + final CyclicBarrier crd = new CyclicBarrier(2); - final long start = System.currentTimeMillis(); + /* + * Checks this scenario: + * 1. Before the query execution all partitions should not be reserved. + * 2. During the query execution all partitions should be reserved. + * 3. After the query execution all partitions should be released. + */ + Callable reservationsChecker = new Callable() { + @Override public Boolean call() throws Exception { + final List parts = grid.context().cache().internalCache(PERSON_CACHE).context() + .topology().localPartitions(); + + // 1. Before the query execution all partitions should not be reserved. + for (GridDhtLocalPartition part : parts) { + if (part.reservations() > 0) { + log.error("Partitions should not be reserved before the query execution."); + + return false; // There are no reservations should be here yet. + } + } - List> res; /* = cache.query(qry).getAll(); + final Set partsSet = new HashSet<>(parts); - System.out.println((System.currentTimeMillis() - start) + " res before1=" + res.get(0).get(0));*/ + crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); // Query execution starts here. - startGrid(1); + // 2. During the query execution all partitions should be reserved. + boolean wasReserved = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (Iterator partsIt = partsSet.iterator(); partsIt.hasNext();) { + GridDhtLocalPartition part = partsIt.next(); + if (part.reservations() == 1) + partsIt.remove(); + } - /*final long duration = 10000; + if (partsSet.isEmpty()) + return true; // All partitions have been reserved. + else { + log.warning("Partitions should be reserved during the query execution."); + return false; + } + } + }, TIMEOUT_MS); - Runnable r = new Runnable() { - @Override public void run() { - int i = 0; + if (!wasReserved) + return false; - while (System.currentTimeMillis() < start + duration) { - List> r = cache.query(qry).getAll(); - System.out.println((System.currentTimeMillis() - start) + " res in cycle" + i + "=" + r.get(0).get(0)); - //doSleep(1); - } - } - }; + crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); - Thread t1 = new Thread(r); - Thread t2 = new Thread(r); + // 3. After the query execution all partitions should be released. + boolean wasReleased = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (GridDhtLocalPartition part : parts) { + if (part.reservations() > 0) { + log.error("Partitions should be released after the query execution."); - t1.start(); - doSleep(200); - t2.start(); + return false; + } + } - t1.join(); - t2.join();*/ + return true; // All partitions have been released. + } + }, TIMEOUT_MS); - doSleep(1000); - res = cache.query(qry).getAll(); + return wasReleased; + } + }; - System.out.println((System.currentTimeMillis() - start) + " res after second started and small sleep=" + res.get(0).get(0)); + FutureTask reservationsResult = new FutureTask<>(reservationsChecker); + Thread checkerThread = new Thread(reservationsResult); + checkerThread.start(); - doSleep(3000); + crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); // Wait before query execution. - res = cache.query(qry).getAll(); + cache.query(qry).getAll(); - System.out.println("res after sleep=" + res.get(0).get(0)); + crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); // Wait after query execution. + assertTrue("Partitions are not properly reserved or released", reservationsResult.get()); } /** {@inheritDoc} */ @@ -129,46 +200,27 @@ private int createAndFillCache() { Affinity aff = grid(0).affinity(PERSON_CACHE); - return fillAllPartitions(cache, aff); + return fillCache(cache, aff); } /** - * Fills each partition in the cache with a single data entry. + * Populates cache with a data. * * @param cache - Cache to fill all partition to. * @param aff Affinity. - * @return Number of filled partitions + * @return Number of filled entities. */ - private int fillAllPartitions(IgniteCache cache, Affinity aff) { - /*int partsCnt = aff.partitions(); - - Set emptyParts = new HashSet<>(partsCnt); - - for (int i = 0; i < partsCnt; i++) - emptyParts.add(i); - - int cntr = 0; - - while (!emptyParts.isEmpty()) { - int part = aff.partition(cntr++); - - if (emptyParts.remove(part)) - cache.put(cntr, new Person("p_"+ cntr, cntr)); - - if (cntr > 100_000) - fail("Failed to fill all partitions"); - }*/ - - int partsCnt = 1_000_000; + private int fillCache(IgniteCache cache, Affinity aff) { + // Should be quite enough for a long time of query execution (hundreds of milliseconds). IgniteDataStreamer streamer = grid(0).dataStreamer(PERSON_CACHE); - for (int i = 0; i < partsCnt; i++) + for (int i = 0; i < CACHE_SIZE; i++) streamer.addData(i, new Person("p_"+ i, i)); streamer.flush(); - return partsCnt; + return CACHE_SIZE; } /** From 799ca6c743474737270ecad176e762a6dfe0d383 Mon Sep 17 00:00:00 2001 From: devozerov Date: Tue, 12 Dec 2017 17:14:00 +0300 Subject: [PATCH 03/10] Cosmetics. --- .../processors/query/h2/twostep/GridMapQueryExecutor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 5d7e480e9e0..230b67ae3e4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -257,8 +257,6 @@ private MapNodeResults resultsForNode(UUID nodeId) { return nodeRess; } - - /** * @param caches Cache IDs. * @return The first found partitioned cache. From 18ab39626cb8542a56c66a6bbf27671042c24e51 Mon Sep 17 00:00:00 2001 From: rkondakov Date: Wed, 13 Dec 2017 22:01:42 +0300 Subject: [PATCH 04/10] IGNITE-7039: Fixes after the review: tests, eager partitions reservations. --- .../query/GridQueryFieldsResult.java | 2 +- .../query/GridQueryFieldsResultAdapter.java | 8 + .../processors/query/h2/IgniteH2Indexing.java | 97 ++++-- .../query/h2/sql/GridSqlQueryParser.java | 28 ++ ...IgniteCacheLocalQueryReservationsTest.java | 313 +++++++++++++----- 5 files changed, 353 insertions(+), 95 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java index e32a6872c4b..842e273b8db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java @@ -25,7 +25,7 @@ * Field query result. It is composed of * fields metadata and iterator over queried fields. */ -public interface GridQueryFieldsResult { +public interface GridQueryFieldsResult extends AutoCloseable { /** * Gets metadata for queried fields. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java index 7f1d175505a..df57f526683 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java @@ -33,6 +33,9 @@ /** Result iterator. */ private final GridCloseableIterator> it; + /** Underlying resource closed flag. */ + protected volatile boolean closed; + /** * Creates query field result composed of field metadata and iterator * over queried fields. @@ -55,4 +58,9 @@ public GridQueryFieldsResultAdapter(@Nullable List metaD @Override public GridCloseableIterator> iterator() throws IgniteCheckedException{ return it; } + + /** {@inheritDoc} */ + @Override public void close() { + closed = true; + } } \ No newline at end of file diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index a918fe40c28..f6980bdd5dd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -352,6 +352,9 @@ /** */ private final ConcurrentMap reservations = new ConcurrentHashMap8<>(); + /** Map from sql string to affected caches ids list */ + private final ConcurrentMap> sqlToCacheIdsCache = new ConcurrentHashMap8<>(); + /** */ private final IgniteInClosure> logger = new IgniteInClosure>() { @Override public void apply(IgniteInternalFuture fut) { @@ -897,38 +900,47 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) throw new IgniteCheckedException("Cannot prepare query metadata", e); } - return new GridQueryFieldsResultAdapter(meta, null) { - @Override public GridCloseableIterator> iterator() throws IgniteCheckedException { - assert GridH2QueryContext.get() == null; + final GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS, + schemaName, U.currentTimeMillis(), cancel, true); - GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS, - schemaName, U.currentTimeMillis(), cancel, true); + runs.putIfAbsent(run.id(), run); - runs.putIfAbsent(run.id(), run); + List cacheIds = getCacheIds(p); - List cacheIds = collectCacheIds(p); + AffinityTopologyVersion topVer = readyTopologyVersion(); - AffinityTopologyVersion topVer = readyTopologyVersion(); + final List reserved = new ArrayList<>(); - List reserved = new ArrayList<>(); + if (!reservePartitions(cacheIds, topVer, parts, reserved)) + throw new IgniteCheckedException("Failed to reserve partitions for [cacheIds=" + cacheIds + + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); - if (!reservePartitions(cacheIds, topVer, parts, reserved)) - throw new IgniteCheckedException("Failed to reserve partitions for [cacheIds=" + cacheIds + - ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); + final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) + .filter(filter) + .distributedJoinMode(OFF) + .reservations(reserved); - GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) - .filter(filter) - .distributedJoinMode(OFF) - .reservations(reserved); + assert GridH2QueryContext.get() == null; - GridH2QueryContext.set(ctx); + GridH2QueryContext.set(ctx); + + return new GridQueryFieldsResultAdapter(meta, null) { + @Override public GridCloseableIterator> iterator() throws IgniteCheckedException { try { ResultSet rs = executeSqlQueryWithTimer(stmt, conn, qry, params, timeout, cancel); return new H2FieldsIterator(rs); } finally { + close(); + } + } + + @Override public void close() { + if (!closed) { + super.close(); + GridH2QueryContext.clearThreadLocal(); ctx.clearContext(false); @@ -939,6 +951,32 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) }; } + /** + * Returns all affected caches ids for the give prepared statement. + * + * @param p Prepared statement. + * @return List of caches. + */ + private List getCacheIds(Prepared p) { + String sql = p.getSQL(); + + assert sql != null; + + List cacheIds = sqlToCacheIdsCache.get(sql); + + if (cacheIds == null) { + GridSqlQueryParser parser = new GridSqlQueryParser(false); + + parser.parse(p); + + cacheIds = parser.getAffectedCacheIds(); + + sqlToCacheIdsCache.put(sql, cacheIds); + } + + return cacheIds; + } + /** {@inheritDoc} */ @Override public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params, IgniteDataStreamer streamer) throws IgniteCheckedException { @@ -1146,7 +1184,7 @@ public void bindParameters(PreparedStatement stmt, final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, F.asList(args), filter, qry.isEnforceJoinOrder(), qry.getTimeout(), cancel, qry.getPartitions()); - QueryCursorImpl> cursor = new QueryCursorImpl<>(new Iterable>() { + QueryCursorImpl> cursor = new QueryCursorImpl>(new Iterable>() { @Override public Iterator> iterator() { try { return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary); @@ -1155,7 +1193,18 @@ public void bindParameters(PreparedStatement stmt, throw new IgniteException(e); } } - }, cancel); + }, cancel) { + @Override public void close() { + super.close(); + + try { + res.close(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + }; cursor.fieldsMeta(res.metaData()); @@ -2539,6 +2588,16 @@ private boolean isDefaultSchema(String schemaName) { if (!F.isEmpty(qry.cacheIds()) && qry.cacheIds().contains(cacheId)) it.remove(); } + + for (Iterator>> it = sqlToCacheIdsCache.entrySet().iterator(); it.hasNext();) { + Map.Entry> entry = it.next(); + + assert entry != null; + assert entry.getValue() != null; + + if (entry.getValue().contains(cacheId)) + it.remove(); + } } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index 8ffc5fab628..a18afff33a1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -22,11 +22,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.cache.CacheException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; @@ -542,6 +544,32 @@ public static PreparedWithRemaining preparedWithRemaining(PreparedStatement stmt } } + /** + * Returns a list of ids of the all caches used in the current query. + * + * @return {@link List} of cache ids. + */ + public List getAffectedCacheIds() { + Set cacheIds = new HashSet<>(); + + for (Object o : h2ObjToGridObj.values()) { + if (o instanceof GridSqlAlias) + o = GridSqlAlias.unwrap((GridSqlAst)o); + + if (o instanceof GridSqlTable) { + GridH2Table tbl = ((GridSqlTable)o).dataTable(); + + if (tbl != null) { + Integer cacheId = tbl.cache().cacheId(); + + cacheIds.add(cacheId); + } + } + } + + return new ArrayList<>(cacheIds); + } + /** * @param qry Query expression to parse. * @return Subquery AST. diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java index 9d90e300789..01881480f1e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -29,17 +30,20 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; /** * Test for local query partitions reservation. @@ -52,7 +56,7 @@ /** Timeout */ private static final long TIMEOUT_MS = 30_000L; - /** Cache size. */ + /** Cache size. Should be quite enough for a long time of query execution (hundreds of milliseconds).*/ private static final int CACHE_SIZE = 2_000_000; /** @@ -61,7 +65,7 @@ * @throws Exception If failed. */ public void testLocalSqlFieldsQueryReservations() throws Exception { - Query qry = new SqlFieldsQuery("select count(*) from person where age >= 0 or age < 1000000000"); + Query qry = new SqlFieldsQuery("select name, age from person where age >= 0 or age < 1000000000"); checkReservations(qry); } @@ -79,6 +83,32 @@ public void testLocalSqlQueryReservations() throws Exception { checkReservations(qry); } + /** + * Test for the local {@link SqlFieldsQuery} reservations. + * + * @throws Exception If failed. + */ + public void testLocalSqlFieldsQueryCursorCloseReservations() throws Exception { + Query qry = new SqlFieldsQuery("select name, age from person where age >= 0 or age < 1000000000"); + + checkEagerCursorClosingReservations(qry); + } + + /** + * Test for the local {@link SqlQuery} reservations. + * + * @throws Exception If failed. + */ + public void testLocalSqlQueryCursorCloseReservations() throws Exception { + String sql = "age >= 0 or age < 1000000000"; + + SqlQuery qry = new SqlQuery<>(Person.class, sql); + + checkEagerCursorClosingReservations(qry); + } + + + /** * Checks if a query partitions reservations occurs. * @@ -88,94 +118,235 @@ public void testLocalSqlQueryReservations() throws Exception { private void checkReservations(final Query qry) throws Exception { final IgniteEx grid = startGrid(0); - createAndFillCache(); + createCache(); + + fillCache(); qry.setLocal(true); final IgniteCache cache = grid(0).cache(PERSON_CACHE); - final CyclicBarrier crd = new CyclicBarrier(2); + final List parts = grid.context().cache().internalCache(PERSON_CACHE).context() + .topology().localPartitions(); - /* - * Checks this scenario: - * 1. Before the query execution all partitions should not be reserved. - * 2. During the query execution all partitions should be reserved. - * 3. After the query execution all partitions should be released. - */ - Callable reservationsChecker = new Callable() { - @Override public Boolean call() throws Exception { - final List parts = grid.context().cache().internalCache(PERSON_CACHE).context() - .topology().localPartitions(); + // 1. Before the cache.query() execution all partitions should be released. + checkReservationsAfterClosure(false, parts, "Partitions should be released before query.", + new IgniteClosure() { + @Override public Object apply(Object o) { + return null; // Do nothing - checking state before the query. + } + }); + + // 2. When a cursor obtained from the cache.query(), all partitions should be reserved. + final QueryCursor> cursor = checkReservationsAfterClosure(true, parts, + "Partitions should be reserved when got cursor.", + new IgniteClosure>>() { + @Override public QueryCursor> apply(Object o) { + return (QueryCursor>)cache.query(qry); + } + }); - // 1. Before the query execution all partitions should not be reserved. - for (GridDhtLocalPartition part : parts) { - if (part.reservations() > 0) { - log.error("Partitions should not be reserved before the query execution."); + // 3. During the obtaining iterator from the Cursor partitions reservations should be released for + // an SqlQuery and reserved for an SqlFieldsQuery (due to its laziness). + boolean shouldBeReserved = qry instanceof SqlFieldsQuery; - return false; // There are no reservations should be here yet. - } + final Iterator> it = checkReservationsAfterClosure(shouldBeReserved, parts, + "Partitions should be " + (shouldBeReserved ? "reserved" : "released" ) + " when obtaining iterator.", + new IgniteClosure>>() { + @Override public Iterator> apply(Object o) { + return cursor.iterator(); } + }); - final Set partsSet = new HashSet<>(parts); + // 4. When iterating over the result, all partitions should be released because entire result set is in the + // memory now. + final List res = checkReservationsAfterClosure(false, parts, + "Partitions should be released when iterating over the result set.", + new IgniteClosure>() { + @Override public List apply(Object o) { + List r = new ArrayList(); - crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); // Query execution starts here. + while (it.hasNext()) + r.add(it.next()); - // 2. During the query execution all partitions should be reserved. - boolean wasReserved = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - for (Iterator partsIt = partsSet.iterator(); partsIt.hasNext();) { - GridDhtLocalPartition part = partsIt.next(); - if (part.reservations() == 1) - partsIt.remove(); - } + return r; + } + }); - if (partsSet.isEmpty()) - return true; // All partitions have been reserved. - else { - log.warning("Partitions should be reserved during the query execution."); + assertEquals("Wrong result set size.", CACHE_SIZE, res.size()); - return false; - } - } - }, TIMEOUT_MS); + // 5. Should be released after the cursor has been closed. + checkReservationsAfterClosure(false, parts, + "Partitions should be released after the cursor has been closed.", + new IgniteClosure() { + @Override public Object apply(Object o) { + cursor.close(); - if (!wasReserved) - return false; + return null; + } + }); + } - crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); + /** + * Checks if reservations have been released after the cursor + * has been closed eagerly (before the iterator obtaining) + * + * @param qry Query. + * @throws Exception If failed. + */ + private void checkEagerCursorClosingReservations(final Query qry) throws Exception { + final IgniteEx grid = startGrid(0); - // 3. After the query execution all partitions should be released. - boolean wasReleased = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - for (GridDhtLocalPartition part : parts) { - if (part.reservations() > 0) { - log.error("Partitions should be released after the query execution."); + createCache(); - return false; - } - } + fillCache(); - return true; // All partitions have been released. - } - }, TIMEOUT_MS); + qry.setLocal(true); + + final IgniteCache cache = grid(0).cache(PERSON_CACHE); + + final List parts = grid.context().cache().internalCache(PERSON_CACHE).context() + .topology().localPartitions(); + + // 1. Before the cache.query() execution all partitions should be released. + checkReservationsAfterClosure(false, parts, "Partitions should be released before query.", + new IgniteClosure() { + @Override public Object apply(Object o) { + return null; // Do nothing - checking state before the query. + } + }); + + // 2. When a cursor obtained from the cache.query(), all partitions should be reserved. + final QueryCursor> cursor = checkReservationsAfterClosure(true, parts, + "Partitions should be reserved when got cursor.", + new IgniteClosure>>() { + @Override public QueryCursor> apply(Object o) { + return (QueryCursor>)cache.query(qry); + } + }); - return wasReleased; + // 3. Partitions should be released when cursor closed. + checkReservationsAfterClosure(false, parts, + "Partitions should be released when cursor is closed.", + new IgniteClosure() { + @Override public Object apply(Object o) { + cursor.close(); + + return null; + } + }); + } + + /** + * Checks reservations status after the given closure having been invoked. + * + * @param shouldBeReserved Expected reservation status. {@code True} if reserved. + * @param parts Partitions. + * @param msg Warning message. + * @param clo Closure. + * @param Closure argument parameter. + * @param Closure return parameter. + * @return Closure invocation result. + * @throws Exception If failed. + */ + private R checkReservationsAfterClosure(final boolean shouldBeReserved, + final List parts, final String msg, final IgniteClosure clo) throws Exception { + final CyclicBarrier crd = new CyclicBarrier(2); + + Callable reservationsChecker = new Callable() { + @Override public Boolean call() throws Exception { + // After this barrier closure will be invoked. + crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // Check if the partitions are in the proper state. + if (shouldBeReserved) { + if (!isReserved(parts, msg)) + return false; + } else { + if (!isReleased(parts, msg)) + return false; + } + + // Wait for the end of checking. + crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + return true; } }; - FutureTask reservationsResult = new FutureTask<>(reservationsChecker); + FutureTask reservationsRes = new FutureTask<>(reservationsChecker); - Thread checkerThread = new Thread(reservationsResult); + Thread checkerThread = new Thread(reservationsRes); checkerThread.start(); - crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); // Wait before query execution. + crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + R res = clo.apply(null); - cache.query(qry).getAll(); + crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); - crd.await(TIMEOUT_MS, TimeUnit.MILLISECONDS); // Wait after query execution. + assertTrue("Partitions are not properly " + (shouldBeReserved ? "reserved." : "released."), + reservationsRes.get()); - assertTrue("Partitions are not properly reserved or released", reservationsResult.get()); + return res; + } + + /** + * Checks if all partitions are reserved. + * + * @param parts Partitions. + * @param msg Warning message. + * @return {@code True} if reserved. + * @throws IgniteInterruptedCheckedException If failed. + */ + @NotNull private Boolean isReserved(List parts, final String msg) + throws IgniteInterruptedCheckedException { + final Set partsSet = new HashSet<>(parts); + + return GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (Iterator partsIt = partsSet.iterator(); partsIt.hasNext();) { + GridDhtLocalPartition part = partsIt.next(); + + if (part.reservations() == 1) + partsIt.remove(); + } + + if (partsSet.isEmpty()) + return true; // All partitions have been reserved. + else { + log.warning(msg); + + return false; + } + } + }, TIMEOUT_MS); + } + + /** + * Checks if all partitions are released. + * + * @param parts Partitions. + * @param msg Warning message. + * @return {@code True} if released. + * @throws IgniteInterruptedCheckedException If failed. + */ + private boolean isReleased(final List parts, final String msg) + throws IgniteInterruptedCheckedException { + return GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (GridDhtLocalPartition part : parts) { + if (part.reservations() > 0) { + log.warning(msg); + + return false; + } + } + + return true; // All partitions have been released. + } + }, TIMEOUT_MS); } /** {@inheritDoc} */ @@ -184,11 +355,9 @@ private void checkReservations(final Query qry) throws Exception { } /** - * Creates and fills cache. - * - * @return Number of filled cache entities. + * Creates cache. */ - private int createAndFillCache() { + private void createCache() { CacheConfiguration cacheConf = new CacheConfiguration<>(PERSON_CACHE); cacheConf.setCacheMode(CacheMode.PARTITIONED) @@ -196,23 +365,15 @@ private int createAndFillCache() { .setIndexedTypes(Integer.class, Person.class) .setName(PERSON_CACHE); - IgniteCache cache = grid(0).createCache(cacheConf); - - Affinity aff = grid(0).affinity(PERSON_CACHE); - - return fillCache(cache, aff); + grid(0).createCache(cacheConf); } /** * Populates cache with a data. * - * @param cache - Cache to fill all partition to. - * @param aff Affinity. * @return Number of filled entities. */ - private int fillCache(IgniteCache cache, Affinity aff) { - // Should be quite enough for a long time of query execution (hundreds of milliseconds). - + private int fillCache() { IgniteDataStreamer streamer = grid(0).dataStreamer(PERSON_CACHE); for (int i = 0; i < CACHE_SIZE; i++) @@ -220,6 +381,8 @@ private int fillCache(IgniteCache cache, Affinity aff) streamer.flush(); + streamer.close(); + return CACHE_SIZE; } From 2e979146fe49fd308987c0f1bb02294ee2a769b6 Mon Sep 17 00:00:00 2001 From: rkondakov Date: Thu, 14 Dec 2017 16:37:13 +0300 Subject: [PATCH 05/10] IGNITE-7039: Fix after tests failed - problems if qursor is closed from the another thread. --- .../processors/query/h2/IgniteH2Indexing.java | 13 ++- ...gniteCacheAbstractFieldsQuerySelfTest.java | 79 ++++++++++++++++++- 2 files changed, 81 insertions(+), 11 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index f6980bdd5dd..5c23b14a602 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -920,19 +920,20 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) .distributedJoinMode(OFF) .reservations(reserved); - assert GridH2QueryContext.get() == null; - - GridH2QueryContext.set(ctx); - return new GridQueryFieldsResultAdapter(meta, null) { - @Override public GridCloseableIterator> iterator() throws IgniteCheckedException { try { + assert GridH2QueryContext.get() == null; + + GridH2QueryContext.set(ctx); + ResultSet rs = executeSqlQueryWithTimer(stmt, conn, qry, params, timeout, cancel); return new H2FieldsIterator(rs); } finally { + GridH2QueryContext.clearThreadLocal(); + close(); } } @@ -941,8 +942,6 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) if (!closed) { super.close(); - GridH2QueryContext.clearThreadLocal(); - ctx.clearContext(false); runs.remove(run.id()); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java index a285a8bb719..85e85795da0 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import javax.cache.CacheException; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; @@ -36,7 +37,6 @@ import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.AffinityKey; -import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; @@ -44,11 +44,9 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlIndexMetadata; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata; -import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; -import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongValue; import org.apache.ignite.internal.processors.datastructures.GridCacheInternalKeyImpl; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; @@ -723,6 +721,79 @@ public void testQueryString() throws Exception { } } + /** + * Checks closing cursor from the another thread. + * + * @throws Exception If failed. + */ + public void testQueryCancelFromAnotherThread() throws Exception { + final QueryCursor> cur = strCache.query(sqlFieldsQuery("select * from String")); + + final CountDownLatch latch = new CountDownLatch(1); + + final Throwable[] exceptions = new Throwable[1]; + + ignite(0).scheduler().runLocal(new Runnable() { + @Override public void run() { + try { + cur.close(); + } + catch (Throwable e) { + exceptions[0] = e; + } + finally { + latch.countDown(); + } + } + }); + + latch.await(); + + assertNull("Exception occurred when cancelled from another thread.", exceptions[0]); + + Exception ex = null; + Iterator it = null; + + try { + it = cur.iterator(); + } + catch (Exception e) { + ex = e; + } + + assertNotNull("Exception hasn't been thrown during obtaining iterator from the closed cursor.", ex); + assertNull("Iterator has been fetched after the cursor had been closed.", it); + } + + /** + * Checks if simultaneous queries are executed independently. + * + * @throws Exception If failed. + */ + public void testSimultaneousQueries() throws Exception { + final QueryCursor> cur1 = strCache.query(sqlFieldsQuery("select * from String").setLocal(true)); + final QueryCursor> cur2 = strCache.query(sqlFieldsQuery("select * from String").setLocal(true)); + + cur2.close(); + + Exception ex = null; + Iterator it = null; + + try { + it = cur1.iterator(); + } + catch (Exception e) { + ex = e; + } + + assertNotNull("", it.next()); + + assertNull("No exception should be thrown if cursor is not closed.", ex); + assertNotNull("Iterator should not be empty.", it); + + cur1.close(); + } + /** @throws Exception If failed. */ public void testQueryIntegersWithJoin() throws Exception { QueryCursor> qry = intCache.query(sqlFieldsQuery( From e73fd01eab3bd645784f5d4f37886197b4077469 Mon Sep 17 00:00:00 2001 From: rkondakov Date: Thu, 14 Dec 2017 19:16:14 +0300 Subject: [PATCH 06/10] IGNITE-7039: Small fixes after the tests failed. --- .../ignite/internal/processors/query/h2/IgniteH2Indexing.java | 2 +- .../internal/processors/query/h2/sql/GridSqlQueryParser.java | 3 ++- .../cache/IgniteCacheAbstractFieldsQuerySelfTest.java | 4 +--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 5c23b14a602..ed93e031ea7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -2838,7 +2838,7 @@ public boolean reservePartitions( if (cctx == null) // Cache was not found, probably was not deployed yet. return false; - if (cctx.isLocal() || !cctx.rebalanceEnabled()) + if (cctx.isLocal() || !cctx.rebalanceEnabled() || cctx.localNode().isClient()) continue; // For replicated cache topology version does not make sense. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index 0a58be32a3a..eb9e61067db 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -92,6 +92,7 @@ import org.h2.table.Column; import org.h2.table.FunctionTable; import org.h2.table.IndexColumn; +import org.h2.table.MetaTable; import org.h2.table.RangeTable; import org.h2.table.Table; import org.h2.table.TableBase; @@ -618,7 +619,7 @@ private GridSqlElement parseTable(Table tbl) { // We can't cache simple tables because otherwise it will be the same instance for all // table filters. Thus we will not be able to distinguish one table filter from another. // Table here is semantically equivalent to a table filter. - if (tbl instanceof TableBase) + if (tbl instanceof TableBase || tbl instanceof MetaTable) return new GridSqlTable(tbl); // Other stuff can be cached because we will have separate instances in diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java index 85e85795da0..124cb99f4ed 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java @@ -786,10 +786,8 @@ public void testSimultaneousQueries() throws Exception { ex = e; } - assertNotNull("", it.next()); - assertNull("No exception should be thrown if cursor is not closed.", ex); - assertNotNull("Iterator should not be empty.", it); + assertNotNull("Iterator should not be null.", it); cur1.close(); } From b962bac91708fed08a9d5e9089fe5f308dec29d6 Mon Sep 17 00:00:00 2001 From: rkondakov Date: Thu, 14 Dec 2017 21:59:20 +0300 Subject: [PATCH 07/10] Small fix --- .../ignite/internal/processors/query/h2/IgniteH2Indexing.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index ed93e031ea7..fa2a533d3ed 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -2827,7 +2827,7 @@ public boolean reservePartitions( ) throws IgniteCheckedException { assert topVer != null; - if (F.isEmpty(cacheIds)) + if (F.isEmpty(cacheIds) || ctx.clientNode()) return true; Collection partIds = wrap(explicitParts); @@ -2838,7 +2838,7 @@ public boolean reservePartitions( if (cctx == null) // Cache was not found, probably was not deployed yet. return false; - if (cctx.isLocal() || !cctx.rebalanceEnabled() || cctx.localNode().isClient()) + if (cctx.isLocal() || !cctx.rebalanceEnabled()) continue; // For replicated cache topology version does not make sense. From fea349c24139b19621bce54db15681a4decc9401 Mon Sep 17 00:00:00 2001 From: SGrimstad Date: Fri, 14 Sep 2018 18:48:48 +0300 Subject: [PATCH 08/10] IGNITE-7039 master merged and fixed --- .../query/h2/DmlStatementsProcessor.java | 38 +++- .../processors/query/h2/IgniteH2Indexing.java | 84 ++++++--- .../h2/twostep/GridMapQueryExecutor.java | 177 +----------------- ...IgniteCacheLocalQueryReservationsTest.java | 1 - ...exingCachePartitionLossPolicySelfTest.java | 8 +- .../h2/GridIndexingSpiAbstractSelfTest.java | 2 + .../h2/twostep/RetryCauseMessageSelfTest.java | 14 +- 7 files changed, 104 insertions(+), 220 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 7ed16c4f45e..048d0a713c8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -434,8 +434,8 @@ long streamUpdateQuery(String schemaName, IgniteDataStreamer streamer, PreparedS false, false, 0, - null, - null); + null + ); it = res.iterator(); } @@ -653,7 +653,10 @@ else if (plan.hasRows()) fieldsQry.isEnforceJoinOrder(), false, fieldsQry.getTimeout(), - fieldsQry.getPartitions()); + cancel, + null, + fieldsQry.getPartitions() + ); cur = new QueryCursorImpl<>(new Iterable>() { @Override public Iterator> iterator() { @@ -1130,10 +1133,17 @@ UpdateResult mapDistributedUpdate(String schemaName, PreparedStatement stmt, Sql * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public UpdateSourceIterator prepareDistributedUpdate(String schema, Connection conn, - PreparedStatement stmt, SqlFieldsQuery qry, - IndexingQueryFilter filter, GridQueryCancel cancel, boolean local, - AffinityTopologyVersion topVer, MvccSnapshot mvccSnapshot) throws IgniteCheckedException { + public UpdateSourceIterator prepareDistributedUpdate( + String schema, + Connection conn, + PreparedStatement stmt, + SqlFieldsQuery qry, + IndexingQueryFilter filter, + GridQueryCancel cancel, + boolean local, + AffinityTopologyVersion topVer, + MvccSnapshot mvccSnapshot + ) throws IgniteCheckedException { Prepared prepared = GridSqlQueryParser.prepared(stmt); @@ -1173,9 +1183,17 @@ else if (!opCtx.isKeepBinary()) new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel).get(0); } else { - final GridQueryFieldsResult res = idx.queryLocalSqlFields(schema, plan.selectQuery(), - F.asList(qry.getArgs()), filter, qry.isEnforceJoinOrder(), false, qry.getTimeout(), cancel, - new StaticMvccQueryTracker(cctx, mvccSnapshot)); + final GridQueryFieldsResult res = idx.queryLocalSqlFields( + schema, + plan.selectQuery(), + F.asList(qry.getArgs()), + filter, + qry.isEnforceJoinOrder(), + false, qry.getTimeout(), + cancel, + new StaticMvccQueryTracker(cctx, mvccSnapshot), + qry.getPartitions() + ); cur = new QueryCursorImpl<>(new Iterable>() { @Override public Iterator> iterator() { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index e10c355c376..4f9dd67b79b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -205,7 +205,6 @@ import org.h2.util.JdbcUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE_PORT; @@ -344,11 +343,13 @@ /** */ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 - private final ThreadLocalObjectPool connectionPool = new ThreadLocalObjectPool<>(IgniteH2Indexing.this::newConnectionWrapper, 5); + private final ThreadLocalObjectPool connectionPool = + new ThreadLocalObjectPool<>(IgniteH2Indexing.this::newConnectionWrapper, 5); /** */ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 - private final ThreadLocal> connCache = new ThreadLocal>() { + private final ThreadLocal> connCache = + new ThreadLocal>() { @Override public ThreadLocalObjectPool.Reusable get() { ThreadLocalObjectPool.Reusable reusable = super.get(); @@ -399,10 +400,10 @@ new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE); /** */ - private final ConcurrentMap reservations = new ConcurrentHashMap8<>(); + private final ConcurrentMap reservations = new ConcurrentHashMap<>(); /** Map from sql string to affected caches ids list */ - private final ConcurrentMap> sqlToCacheIdsCache = new ConcurrentHashMap8<>(); + private final ConcurrentMap> sqlToCacheIdsCache = new ConcurrentHashMap<>(); /** */ private final IgniteInClosure> logger = new IgniteInClosure>() { @@ -1071,40 +1072,60 @@ GridH2IndexBase createSortedIndex(String name, GridH2Table tbl, boolean pk, List * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public GridQueryFieldsResult queryLocalSqlFields(String schemaName, String qry, @Nullable Collection params, - IndexingQueryFilter filter, boolean enforceJoinOrder, boolean startTx, int timeout, - GridQueryCancel cancel) throws IgniteCheckedException { - return queryLocalSqlFields(schemaName, qry, params, filter, enforceJoinOrder, startTx, timeout, cancel, null); + public GridQueryFieldsResult queryLocalSqlFields( + String schemaName, + String qry, + @Nullable Collection params, + IndexingQueryFilter filter, + boolean enforceJoinOrder, + boolean startTx, + int timeout, + GridQueryCancel cancel + ) throws IgniteCheckedException { + return queryLocalSqlFields( + schemaName, + qry, + params, + filter, + enforceJoinOrder, + startTx, + timeout, + cancel, + null, + null); } /** * Queries individual fields (generally used by JDBC drivers). * * @param schemaName Schema name. - * @param qry Query. + * @param qryParam Query. * @param params Query parameters. * @param filter Cache name and key filter. * @param enforceJoinOrder Enforce join order of tables in the query. * @param startTx Start transaction flag. - * @param timeout Query timeout in milliseconds. + * @param timeoutParam Query timeout in milliseconds. * @param cancel Query cancel. - * @param mvccTracker Query tracker. + * @param mvccTrackerParam Query tracker. * @return Query result. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - GridQueryFieldsResult queryLocalSqlFields( + public GridQueryFieldsResult queryLocalSqlFields( final String schemaName, - final String qry, + final String qryParam, @Nullable final Collection params, final IndexingQueryFilter filter, - boolean enforceJoinOrder, - boolean startTx, - final int timeout, + final boolean enforceJoinOrder, + final boolean startTx, + final int timeoutParam, final GridQueryCancel cancel, - MvccQueryTracker mvccTracker, + final MvccQueryTracker mvccTrackerParam, final int[] parts ) throws IgniteCheckedException { + String qry = qryParam; + int timeout = timeoutParam; + MvccQueryTracker mvccTracker = mvccTrackerParam; GridNearTxLocal tx = null; boolean mvccEnabled = mvccEnabled(kernalContext()); assert mvccEnabled || mvccTracker == null; @@ -1148,9 +1169,15 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) { final List reserved = new ArrayList<>(); - if (!reservePartitions(cacheIds, topVer, parts, reserved)) - throw new IgniteCheckedException("Failed to reserve partitions for [cacheIds=" + cacheIds + - ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); + String err = reservePartitions(cacheIds, topVer, parts, reserved, nodeId, run.id()); + + if (!F.isEmpty(err)) + throw new IgniteCheckedException(String.format( + "Failed to reserve partitions for [cacheIds=%s, topVer=%s, parts=%s", + cacheIds, + topVer, + Arrays.toString(parts) + )); final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) .filter(filter) @@ -1611,9 +1638,10 @@ public void bindParameters(PreparedStatement stmt, startTx, timeout, cancel, + null, qry.getPartitions()); - QueryCursorImpl> cursor = new QueryCursorImpl<>(new Iterable>() { + QueryCursorImpl> cursor = new QueryCursorImpl>(new Iterable>() { @SuppressWarnings("NullableProblems") @Override public Iterator> iterator() { try { @@ -1740,8 +1768,8 @@ public void bindParameters(PreparedStatement stmt, parts = U.toIntArray(filteredParts); } } - - if (!reservePartitions(cacheIds, topVer, parts, reserved)) + String err = reservePartitions(cacheIds, topVer, parts, reserved, nodeId, -1L); + if (!F.isEmpty(err)) throw new IgniteCheckedException("Failed to reserve partitions for [cacheIds=" + cacheIds + ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']'); @@ -3886,7 +3914,7 @@ private boolean hasSystemViews(GridCacheTwoStepQuery twoStepQry) { * @return String which is null in case of success or with causeMessage if failed * @throws IgniteCheckedException If failed. */ - private String reservePartitions( + public String reservePartitions( @Nullable List cacheIds, AffinityTopologyVersion topVer, final int[] explicitParts, @@ -3915,12 +3943,12 @@ private String reservePartitions( continue; // For replicated cache topology version does not make sense. - final MapReservationKey grpKey = new MapReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); + final ReservationKey grpKey = new ReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); GridReservable r = reservations.get(grpKey); if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. - if (r != MapReplicatedReservation.INSTANCE) { + if (r != ReplicatedReservation.INSTANCE) { if (!r.reserve()) return String.format("Failed to reserve partitions for query (group " + "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + @@ -3958,7 +3986,7 @@ private String reservePartitions( } // Mark that we checked this replicated cache. - reservations.putIfAbsent(grpKey, MapReplicatedReservation.INSTANCE); + reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE); } } else { // Reserve primary partitions for partitioned cache (if no explicit given). diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 469e05984f7..a8e9b53a5cf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -21,12 +21,10 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.AbstractCollection; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; @@ -41,7 +39,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cluster.ClusterNode; @@ -55,15 +52,10 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.CompoundLockFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; @@ -89,7 +81,6 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2SelectForUpdateTxDetails; import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -97,7 +88,6 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.thread.IgniteThread; import org.h2.command.Prepared; @@ -106,13 +96,8 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; -import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL; -import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; @@ -249,7 +234,7 @@ else if (msg instanceof GridH2DmlRequest) if (processed && log.isDebugEnabled()) log.debug("Processed request: " + nodeId + "->" + ctx.localNodeId() + " " + msg); } - catch(Throwable th) { + catch (Throwable th) { U.error(log, "Failed to process message: " + msg, th); } } @@ -300,152 +285,6 @@ private MapNodeResults resultsForNode(UUID nodeId) { return nodeRess; } - /** - * @param cctx Cache context. - * @param p Partition ID. - * @return Partition. - */ - private GridDhtLocalPartition partition(GridCacheContext cctx, int p) { - return cctx.topology().localPartition(p, NONE, false); - } - - /** - * @param cacheIds Cache IDs. - * @param topVer Topology version. - * @param explicitParts Explicit partitions list. - * @param reserved Reserved list. - * @return {@code true} If all the needed partitions successfully reserved. - * @throws IgniteCheckedException If failed. - */ - private boolean reservePartitions( - @Nullable List cacheIds, - AffinityTopologyVersion topVer, - final int[] explicitParts, - List reserved - ) throws IgniteCheckedException { - assert topVer != null; - - if (F.isEmpty(cacheIds)) - return true; - - Collection partIds = wrap(explicitParts); - - for (int i = 0; i < cacheIds.size(); i++) { - GridCacheContext cctx = ctx.cache().context().cacheContext(cacheIds.get(i)); - - if (cctx == null) // Cache was not found, probably was not deployed yet. - return false; - - if (cctx.isLocal() || !cctx.rebalanceEnabled()) - continue; - - // For replicated cache topology version does not make sense. - final MapReservationKey grpKey = new MapReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); - - GridReservable r = reservations.get(grpKey); - - if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. - if (r != MapReplicatedReservation.INSTANCE) { - if (!r.reserve()) - return false; // We need explicit partitions here -> retry. - - reserved.add(r); - } - } - else { // Try to reserve partitions one by one. - int partsCnt = cctx.affinity().partitions(); - - if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. - if (r == null) { // Check only once. - for (int p = 0; p < partsCnt; p++) { - GridDhtLocalPartition part = partition(cctx, p); - - // We don't need to reserve partitions because they will not be evicted in replicated caches. - if (part == null || part.state() != OWNING) - return false; - } - - // Mark that we checked this replicated cache. - reservations.putIfAbsent(grpKey, MapReplicatedReservation.INSTANCE); - } - } - else { // Reserve primary partitions for partitioned cache (if no explicit given). - if (explicitParts == null) - partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); - - for (int partId : partIds) { - GridDhtLocalPartition part = partition(cctx, partId); - - if (part == null || part.state() != OWNING || !part.reserve()) - return false; - - reserved.add(part); - - // Double check that we are still in owning state and partition contents are not cleared. - if (part.state() != OWNING) - return false; - } - - if (explicitParts == null) { - // We reserved all the primary partitions for cache, attempt to add group reservation. - GridDhtPartitionsReservation grp = new GridDhtPartitionsReservation(topVer, cctx, "SQL"); - - if (grp.register(reserved.subList(reserved.size() - partIds.size(), reserved.size()))) { - if (reservations.putIfAbsent(grpKey, grp) != null) - throw new IllegalStateException("Reservation already exists."); - - grp.onPublish(new CI1() { - @Override public void apply(GridDhtPartitionsReservation r) { - reservations.remove(grpKey, r); - } - }); - } - } - } - } - } - - return true; - } - - /** - * @param ints Integers. - * @return Collection wrapper. - */ - private static Collection wrap(final int[] ints) { - if (ints == null) - return null; - - if (ints.length == 0) - return Collections.emptySet(); - - return new AbstractCollection() { - @SuppressWarnings("NullableProblems") - @Override public Iterator iterator() { - return new Iterator() { - /** */ - private int i = 0; - - @Override public boolean hasNext() { - return i < ints.length; - } - - @Override public Integer next() { - return ints[i++]; - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override public int size() { - return ints.length; - } - }; - } - /** * @param node Node. * @param req Query request. @@ -453,7 +292,7 @@ private boolean reservePartitions( private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException { int[] qryParts = req.queryPartitions(); - final Map partsMap = req.partitions(); + final Map partsMap = req.partitions(); final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts; @@ -800,7 +639,8 @@ private void onQueryRequest0( // If we are not the target node for this replicated query, just ignore it. if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { - String sql = qry.query(); Collection params0 = F.asList(qry.parameters(params)); + String sql = qry.query(); + Collection params0 = F.asList(qry.parameters(params)); PreparedStatement stmt; @@ -911,7 +751,7 @@ private void onQueryRequest0( if (!lazy) releaseReservations(); } - catch (Throwable e){ + catch (Throwable e) { releaseReservations(); throw e; @@ -933,7 +773,7 @@ private void onQueryRequest0( if (retryErr != null) { final String retryCause = String.format( "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " + - "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage() + "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage() ); sendRetry(node, reqId, segmentId, retryCause); @@ -1180,7 +1020,8 @@ else if (qr.cancelled()) * @return Next page. * @throws IgniteCheckedException If failed. */ - private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId, + private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, + int qry, int segmentId, int pageSize, boolean removeMapping) throws IgniteCheckedException { MapQueryResult res = qr.result(qry); @@ -1264,7 +1105,7 @@ private void sendRetry(ClusterNode node, long reqId, int segmentId, String retry boolean loc = node.isLocal(); GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId, - /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1, + /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1, loc ? null : Collections.emptyList(), loc ? Collections.emptyList() : null, false); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java index 01881480f1e..42d759de62c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java @@ -49,7 +49,6 @@ * Test for local query partitions reservation. */ public class IgniteCacheLocalQueryReservationsTest extends GridCommonAbstractTest { - /** Cache name */ private static final String PERSON_CACHE = "person"; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java index f2085994b0c..87877f86016 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java @@ -72,15 +72,15 @@ validateQuery0(safe, node, false); // TODO: https://issues.apache.org/jira/browse/IGNITE-7039 -// if (execLocQry) -// validateQuery0(safe, node, true); + if (execLocQry) + validateQuery0(safe, node, true); // 2. Check query against LOST partition. validateQuery0(safe, node, false, part); // TODO: https://issues.apache.org/jira/browse/IGNITE-7039 -// if (execLocQry) -// validateQuery0(safe, node, true, part); + if (execLocQry) + validateQuery0(safe, node, true, part); // 3. Check query on remaining partition. if (remainingPart != null) { diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index d0ca346488d..7d472d55ad1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -351,6 +351,7 @@ public void testSpi() throws Exception { false, 0, null, + null, null); String[] aliases = {"N1", "A1", "N2", "A2"}; @@ -417,6 +418,7 @@ public void testLongQueries() throws Exception { false, 0, null, + null, null); assert res.iterator().hasNext(); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java index ce385114485..9ae53d792ea 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.ReservationKey; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -124,13 +125,13 @@ public void testSynthCacheWasNotFoundMessage() { public void testGrpReservationFailureMessage() { final GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); - final ConcurrentMap reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); + final ConcurrentMap reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", new MockGridMapQueryExecutor(null) { @Override public void onMessage(UUID nodeId, Object msg) { if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { - final MapReservationKey grpKey = new MapReservationKey(ORG, null); + final ReservationKey grpKey = new ReservationKey(ORG, null); reservations.put(grpKey, new GridReservable() { @@ -267,13 +268,13 @@ public void testPartitionedCacheReserveFailureMessage() { public void testNonCollocatedFailureMessage() { final GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); - final ConcurrentMap reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); + final ConcurrentMap reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations"); GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", new MockGridMapQueryExecutor(null) { @Override public void onMessage(UUID nodeId, Object msg) { if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) { - final MapReservationKey grpKey = new MapReservationKey(ORG, null); + final ReservationKey grpKey = new ReservationKey(ORG, null); reservations.put(grpKey, new GridReservable() { @@ -393,11 +394,6 @@ MockGridMapQueryExecutor insertRealExecutor(GridMapQueryExecutor realExecutor) { return startedExecutor.busyLock(); } - /** {@inheritDoc} */ - @Override public void onCacheStop(String cacheName) { - startedExecutor.onCacheStop(cacheName); - } - /** {@inheritDoc} */ @Override public void stopAndUnregisterCurrentLazyWorker() { startedExecutor.stopAndUnregisterCurrentLazyWorker(); From 27df53d7d23c76d9624292d2507d09c2d59328f8 Mon Sep 17 00:00:00 2001 From: SGrimstad Date: Mon, 17 Sep 2018 15:19:42 +0300 Subject: [PATCH 09/10] IGNITE-7039 adjustments for tests --- .../processors/query/h2/IgniteH2Indexing.java | 2 +- ...IgniteCacheLocalQueryReservationsTest.java | 33 +++++++++++-------- ...exingCachePartitionLossPolicySelfTest.java | 2 -- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 4f9dd67b79b..52a88b704db 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -2400,7 +2400,7 @@ private void closeTx(@NotNull GridNearTxLocal tx) throws IgniteCheckedException } catch (IgniteCheckedException e) { throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry + - ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); + ", params=" + Arrays.deepToString(qry.getArgs()) + ", reason="+e.getMessage()+"]", e); } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java index 42d759de62c..05f3bba6ab1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLocalQueryReservationsTest.java @@ -55,8 +55,8 @@ /** Timeout */ private static final long TIMEOUT_MS = 30_000L; - /** Cache size. Should be quite enough for a long time of query execution (hundreds of milliseconds).*/ - private static final int CACHE_SIZE = 2_000_000; + /** Cache size. Should be quite enough for a long time of query execution (hundreds of milliseconds). */ + private static final int CACHE_SIZE = 1_000_000; /** * Test for the local {@link SqlFieldsQuery} reservations. @@ -106,8 +106,6 @@ public void testLocalSqlQueryCursorCloseReservations() throws Exception { checkEagerCursorClosingReservations(qry); } - - /** * Checks if a query partitions reservations occurs. * @@ -150,7 +148,7 @@ private void checkReservations(final Query qry) throws Exception { boolean shouldBeReserved = qry instanceof SqlFieldsQuery; final Iterator> it = checkReservationsAfterClosure(shouldBeReserved, parts, - "Partitions should be " + (shouldBeReserved ? "reserved" : "released" ) + " when obtaining iterator.", + "Partitions should be " + (shouldBeReserved ? "reserved" : "released") + " when obtaining iterator.", new IgniteClosure>>() { @Override public Iterator> apply(Object o) { return cursor.iterator(); @@ -187,8 +185,8 @@ private void checkReservations(final Query qry) throws Exception { } /** - * Checks if reservations have been released after the cursor - * has been closed eagerly (before the iterator obtaining) + * Checks if reservations have been released after the cursor has been closed eagerly (before the iterator + * obtaining) * * @param qry Query. * @throws Exception If failed. @@ -249,7 +247,7 @@ private void checkEagerCursorClosingReservations(final Query qry) throws Excepti * @throws Exception If failed. */ private R checkReservationsAfterClosure(final boolean shouldBeReserved, - final List parts, final String msg, final IgniteClosure clo) throws Exception { + final List parts, final String msg, final IgniteClosure clo) throws Exception { final CyclicBarrier crd = new CyclicBarrier(2); Callable reservationsChecker = new Callable() { @@ -261,7 +259,8 @@ private void checkEagerCursorClosingReservations(final Query qry) throws Excepti if (shouldBeReserved) { if (!isReserved(parts, msg)) return false; - } else { + } + else { if (!isReleased(parts, msg)) return false; } @@ -305,7 +304,7 @@ private void checkEagerCursorClosingReservations(final Query qry) throws Excepti return GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - for (Iterator partsIt = partsSet.iterator(); partsIt.hasNext();) { + for (Iterator partsIt = partsSet.iterator(); partsIt.hasNext(); ) { GridDhtLocalPartition part = partsIt.next(); if (part.reservations() == 1) @@ -351,6 +350,12 @@ private boolean isReleased(final List parts, final String /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); + try { + Thread.sleep(3000); + } + catch (InterruptedException e){ + // no op + } } /** @@ -360,9 +365,9 @@ private void createCache() { CacheConfiguration cacheConf = new CacheConfiguration<>(PERSON_CACHE); cacheConf.setCacheMode(CacheMode.PARTITIONED) - .setBackups(0) - .setIndexedTypes(Integer.class, Person.class) - .setName(PERSON_CACHE); + .setBackups(0) + .setIndexedTypes(Integer.class, Person.class) + .setName(PERSON_CACHE); grid(0).createCache(cacheConf); } @@ -376,7 +381,7 @@ private int fillCache() { IgniteDataStreamer streamer = grid(0).dataStreamer(PERSON_CACHE); for (int i = 0; i < CACHE_SIZE; i++) - streamer.addData(i, new Person("p_"+ i, i)); + streamer.addData(i, new Person("p_" + i, i)); streamer.flush(); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java index 87877f86016..af1e50a30f3 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IndexingCachePartitionLossPolicySelfTest.java @@ -71,14 +71,12 @@ // 1. Check query against all partitions. validateQuery0(safe, node, false); - // TODO: https://issues.apache.org/jira/browse/IGNITE-7039 if (execLocQry) validateQuery0(safe, node, true); // 2. Check query against LOST partition. validateQuery0(safe, node, false, part); - // TODO: https://issues.apache.org/jira/browse/IGNITE-7039 if (execLocQry) validateQuery0(safe, node, true, part); From c0d909101a830684fce21a767023791884da7c18 Mon Sep 17 00:00:00 2001 From: SGrimstad Date: Mon, 17 Sep 2018 15:32:34 +0300 Subject: [PATCH 10/10] IGNITE-7039 master merge --- .../processors/query/h2/IgniteH2Indexing.java | 135 ++++++++++-------- 1 file changed, 72 insertions(+), 63 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 1f9456a28fc..ab688b59e13 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -191,9 +191,8 @@ import org.h2.api.ErrorCode; import org.h2.api.JavaObjectSerializer; import org.h2.command.Prepared; -import org.h2.command.dml.Insert; -import org.h2.command.dml.Select; import org.h2.command.dml.NoOperation; +import org.h2.command.dml.Select; import org.h2.engine.Session; import org.h2.engine.SysProperties; import org.h2.index.Index; @@ -214,13 +213,13 @@ import static org.apache.ignite.IgniteSystemProperties.getString; import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart; -import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; @@ -235,13 +234,12 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; /** - * Indexing implementation based on H2 database engine. In this implementation main query language is SQL, - * fulltext indexing can be performed using Lucene. + * Indexing implementation based on H2 database engine. In this implementation main query language is SQL, fulltext + * indexing can be performed using Lucene. *

- * For each registered {@link GridQueryTypeDescriptor} this SPI will create respective SQL table with - * {@code '_key'} and {@code '_val'} fields for key and value, and fields from - * {@link GridQueryTypeDescriptor#fields()}. - * For each table it will create indexes declared in {@link GridQueryTypeDescriptor#indexes()}. + * For each registered {@link GridQueryTypeDescriptor} this SPI will create respective SQL table with {@code '_key'} and + * {@code '_val'} fields for key and value, and fields from {@link GridQueryTypeDescriptor#fields()}. For each table it + * will create indexes declared in {@link GridQueryTypeDescriptor#indexes()}. */ public class IgniteH2Indexing implements GridQueryIndexing { /** A pattern for commands having internal implementation in Ignite. */ @@ -271,7 +269,7 @@ ";ROW_FACTORY=\"" + GridH2PlainRowFactory.class.getName() + "\"" + ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName(); - // Uncomment this setting to get debug output from H2 to sysout. + // Uncomment this setting to get debug output from H2 to sysout. // ";TRACE_LEVEL_SYSTEM_OUT=3"; /** Dummy metadata for update result. */ @@ -351,35 +349,35 @@ // TODO https://issues.apache.org/jira/browse/IGNITE-9062 private final ThreadLocal> connCache = new ThreadLocal>() { - @Override public ThreadLocalObjectPool.Reusable get() { - ThreadLocalObjectPool.Reusable reusable = super.get(); + @Override public ThreadLocalObjectPool.Reusable get() { + ThreadLocalObjectPool.Reusable reusable = super.get(); - boolean reconnect = true; + boolean reconnect = true; - try { - reconnect = reusable == null || reusable.object().connection().isClosed(); - } - catch (SQLException e) { - U.warn(log, "Failed to check connection status.", e); - } + try { + reconnect = reusable == null || reusable.object().connection().isClosed(); + } + catch (SQLException e) { + U.warn(log, "Failed to check connection status.", e); + } - if (reconnect) { - reusable = initialValue(); + if (reconnect) { + reusable = initialValue(); - set(reusable); - } + set(reusable); + } - return reusable; - } + return reusable; + } - @Override protected ThreadLocalObjectPool.Reusable initialValue() { - ThreadLocalObjectPool.Reusable reusableConnection = connectionPool.borrow(); + @Override protected ThreadLocalObjectPool.Reusable initialValue() { + ThreadLocalObjectPool.Reusable reusableConnection = connectionPool.borrow(); - conns.put(Thread.currentThread(), reusableConnection.object()); + conns.put(Thread.currentThread(), reusableConnection.object()); - return reusableConnection; - } - }; + return reusableConnection; + } + }; /** */ protected volatile GridKernalContext ctx; @@ -465,7 +463,8 @@ private Connection systemConnection() { private H2ConnectionWrapper newConnectionWrapper() { try { return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl)); - } catch (SQLException e) { + } + catch (SQLException e) { throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e); } } @@ -711,6 +710,7 @@ public void executeStatement(String schema, String sql) throws IgniteCheckedExce /** * Execute statement on H2 INFORMATION_SCHEMA. + * * @param sql SQL statement. */ public void executeSystemStatement(String sql) { @@ -779,8 +779,7 @@ private void onSqlException() { GridQueryTypeDescriptor type, CacheDataRow row, @Nullable CacheDataRow prevRow, - boolean prevRowAvailable) throws IgniteCheckedException - { + boolean prevRowAvailable) throws IgniteCheckedException { String cacheName = cctx.name(); H2TableDescriptor tbl = tableDescriptor(schema(cacheName), cacheName, type.name()); @@ -788,7 +787,7 @@ private void onSqlException() { if (tbl == null) return; // Type was rejected. - tbl.table().update(row, prevRow, prevRowAvailable); + tbl.table().update(row, prevRow, prevRowAvailable); if (tbl.luceneIndex() != null) { long expireTime = row.expireTime(); @@ -802,8 +801,7 @@ private void onSqlException() { /** {@inheritDoc} */ @Override public void remove(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row) - throws IgniteCheckedException - { + throws IgniteCheckedException { if (log.isDebugEnabled()) { log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + row.key() + @@ -938,7 +936,7 @@ private void addInitialUserIndex(String schemaName, H2TableDescriptor desc, Grid /** {@inheritDoc} */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public void dynamicIndexDrop(final String schemaName, String idxName, boolean ifExists) - throws IgniteCheckedException{ + throws IgniteCheckedException { String sql = H2Utils.indexDropSql(schemaName, idxName, ifExists); executeSql(schemaName, sql); @@ -1358,6 +1356,7 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) { throw e; } } + /** * Returns all affected caches ids for the give prepared statement. * @@ -1678,7 +1677,7 @@ public void bindParameters(PreparedStatement stmt, /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public QueryCursor> queryLocalSql(String schemaName, String cacheName, + @Override public QueryCursor> queryLocalSql(String schemaName, String cacheName, final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException { String type = qry.getType(); String sqlQry = qry.getSql(); @@ -1733,8 +1732,8 @@ public void bindParameters(PreparedStatement stmt, * @return Queried rows. * @throws IgniteCheckedException If failed. */ - @SuppressWarnings("unchecked") - GridCloseableIterator> queryLocalSql(String schemaName, String cacheName, + @SuppressWarnings("unchecked") GridCloseableIterator> queryLocalSql(String schemaName, + String cacheName, final String qry, String alias, @Nullable final Collection params, String type, final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException { final H2TableDescriptor tbl = tableDescriptor(schemaName, cacheName, type); @@ -1818,6 +1817,7 @@ public void bindParameters(PreparedStatement stmt, /** * Initialises MVCC filter and returns MVCC query tracker if needed. + * * @param stmt Prepared statement. * @param startTx Start transaction flag. * @return MVCC query tracker or {@code null} if MVCC is disabled for involved caches. @@ -1875,9 +1875,9 @@ private static Boolean checkMvcc(PreparedStatement stmt) throws SQLException { for (Object o : parser.objectsMap().values()) { if (o instanceof GridSqlAlias) - o = GridSqlAlias.unwrap((GridSqlAst) o); - if (o instanceof GridSqlTable && ((GridSqlTable) o).dataTable() != null) { - GridCacheContext cctx = ((GridSqlTable) o).dataTable().cache(); + o = GridSqlAlias.unwrap((GridSqlAst)o); + if (o instanceof GridSqlTable && ((GridSqlTable)o).dataTable() != null) { + GridCacheContext cctx = ((GridSqlTable)o).dataTable().cache(); if (mvccEnabled == null) { mvccEnabled = cctx.mvccEnabled(); @@ -2143,7 +2143,7 @@ else if (cmd instanceof SqlSetStreamingCommand) { * @param isQry {@code true} for select queries, otherwise (DML/DDL queries) {@code false}. */ private void checkQueryType(SqlFieldsQuery qry, boolean isQry) { - Boolean qryFlag = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx) qry).isQuery() : null; + Boolean qryFlag = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).isQuery() : null; if (qryFlag != null && qryFlag != isQry) throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver", @@ -2152,6 +2152,7 @@ private void checkQueryType(SqlFieldsQuery qry, boolean isQry) { /** * Process transactional command. + * * @param cmd Command. * @param qry Query. * @throws IgniteCheckedException if failed. @@ -2212,6 +2213,7 @@ else if (cmd instanceof SqlCommitTransactionCommand) { /** * Commit and properly close transaction. + * * @param tx Transaction. * @throws IgniteCheckedException if failed. */ @@ -2228,6 +2230,7 @@ private void doCommit(@NotNull GridNearTxLocal tx) throws IgniteCheckedException /** * Rollback and properly close transaction. + * * @param tx Transaction. * @throws IgniteCheckedException if failed. */ @@ -2243,6 +2246,7 @@ private void doRollback(@NotNull GridNearTxLocal tx) throws IgniteCheckedExcepti /** * Properly close transaction. + * * @param tx Transaction. * @throws IgniteCheckedException if failed. */ @@ -2355,6 +2359,7 @@ private void closeTx(@NotNull GridNearTxLocal tx) throws IgniteCheckedException /** * Execute an all-ready {@link SqlFieldsQuery}. + * * @param schemaName Schema name. * @param prepared H2 command. * @param qry Fields query with flags. @@ -2444,17 +2449,18 @@ private void closeTx(@NotNull GridNearTxLocal tx) throws IgniteCheckedException } catch (IgniteCheckedException e) { throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry + - ", params=" + Arrays.deepToString(qry.getArgs()) + ", reason="+e.getMessage()+"]", e); + ", params=" + Arrays.deepToString(qry.getArgs()) + ", reason=" + e.getMessage() + "]", e); } } /** * Parse and split query if needed, cache either two-step query or statement. + * * @param schemaName Schema name. * @param qry Query. * @param firstArg Position of the first argument of the following {@code Prepared}. - * @return Result: prepared statement, H2 command, two-step query (if needed), - * metadata for two-step query (if needed), evaluated query local execution flag. + * @return Result: prepared statement, H2 command, two-step query (if needed), metadata for two-step query (if + * needed), evaluated query local execution flag. */ private ParsingResult parseAndSplit(String schemaName, SqlFieldsQuery qry, int firstArg) { Connection c = connectionForSchema(schemaName); @@ -2498,7 +2504,7 @@ private ParsingResult parseAndSplit(String schemaName, SqlFieldsQuery qry, int f args = Arrays.copyOfRange(argsOrig, firstArg, firstArg + paramsCnt); } - if (prepared.isQuery()) { + if (prepared.isQuery()) { try { bindParameters(stmt, F.asList(args)); } @@ -2597,6 +2603,7 @@ private ParsingResult parseAndSplit(String schemaName, SqlFieldsQuery qry, int f /** * Make a copy of {@link SqlFieldsQuery} with all flags and preserving type. + * * @param oldQry Query to copy. * @return Query copy. */ @@ -2606,6 +2613,7 @@ private SqlFieldsQuery cloneFieldsQuery(SqlFieldsQuery oldQry) { /** * Split query into two-step query. + * * @param prepared JDBC prepared statement. * @param qry Original fields query. * @return Two-step query. @@ -2695,6 +2703,7 @@ private boolean isFlagSet(int flags, int flag) { /** * Run distributed query on detected set of partitions. + * * @param schemaName Schema name. * @param qry Original query. * @param twoStepQry Two-step query. @@ -2739,6 +2748,7 @@ private boolean isFlagSet(int flags, int flag) { /** * Do initial parsing of the statement and create query caches, if needed. + * * @param c Connection. * @param sqlQry Query. * @return H2 prepared statement. @@ -2752,10 +2762,10 @@ private PreparedStatement prepareStatementAndCaches(Connection c, String sqlQry) } catch (SQLException e) { if (!cachesCreated && ( - e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1 || - e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1 || - e.getErrorCode() == ErrorCode.INDEX_NOT_FOUND_1) - ) { + e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1 || + e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1 || + e.getErrorCode() == ErrorCode.INDEX_NOT_FOUND_1) + ) { try { ctx.cache().createMissingQueryCaches(); } @@ -2899,7 +2909,7 @@ else if (star > 0) { (upper.startsWith("WHERE") || upper.startsWith("ORDER") || upper.startsWith("LIMIT") ? " " : " WHERE "); - if(tableAlias != null) + if (tableAlias != null) t = tableAlias; qry = "SELECT " + t + "." + KEY_FIELD_NAME + ", " + t + "." + VAL_FIELD_NAME + from + qry; @@ -3200,6 +3210,7 @@ private void cleanupConnections() { /** * Removes from cache and returns associated with current thread connection. + * * @return Connection associated with current thread. */ public ThreadLocalObjectPool.Reusable detach() { @@ -3702,7 +3713,7 @@ private void createSqlFunctions(String schema, Class[] clss) throws IgniteChe int cacheId = CU.cacheId(cacheName); for (Iterator> it = - twoStepCache.entrySet().iterator(); it.hasNext();) { + twoStepCache.entrySet().iterator(); it.hasNext(); ) { Map.Entry e = it.next(); GridCacheTwoStepQuery qry = e.getValue().query(); @@ -3711,7 +3722,7 @@ private void createSqlFunctions(String schema, Class[] clss) throws IgniteChe it.remove(); } - for (Iterator>> it = sqlToCacheIdsCache.entrySet().iterator(); it.hasNext();) { + for (Iterator>> it = sqlToCacheIdsCache.entrySet().iterator(); it.hasNext(); ) { Map.Entry> entry = it.next(); assert entry != null; @@ -3745,7 +3756,6 @@ public AffinityTopologyVersion readyTopologyVersion() { /** * @param readyVer Ready topology version. - * * @return {@code true} If pending distributed exchange exists because server topology is changed. */ public boolean serverTopologyChanged(AffinityTopologyVersion readyVer) { @@ -3788,7 +3798,7 @@ public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws ArrayList list = new ArrayList<>(partInfoList.length); - for (CacheQueryPartitionInfo partInfo: partInfoList) { + for (CacheQueryPartitionInfo partInfo : partInfoList) { int partId = (partInfo.partition() >= 0) ? partInfo.partition() : bindPartitionInfoParameter(partInfo, params); @@ -3829,7 +3839,7 @@ private int bindPartitionInfoParameter(CacheQueryPartitionInfo partInfo, Object[ GridH2RowDescriptor desc = dataTable(schema(partInfo.cacheName()), partInfo.tableName()).rowDescriptor(); Object param = H2Utils.convert(params[partInfo.paramIdx()], - desc, partInfo.dataType()); + desc, partInfo.dataType()); return kernalContext().affinity().partition(partInfo.cacheName(), param); } @@ -3883,7 +3893,7 @@ private int bindPartitionInfoParameter(CacheQueryPartitionInfo partInfo, Object[ List cacheIds = new ArrayList<>(); if (p != null && p instanceof Select) { - Select select = (Select) p; + Select select = (Select)p; Set

tbls = select.getTables(); @@ -3896,7 +3906,6 @@ private int bindPartitionInfoParameter(CacheQueryPartitionInfo partInfo, Object[ return cacheIds; } - /** * Collect cache identifiers from two-step query. * @@ -3998,7 +4007,7 @@ public String reservePartitions( if (!r.reserve()) return String.format("Failed to reserve partitions for query (group " + "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " + - "cacheName=%s]",ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name()); + "cacheName=%s]", ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name()); reserved.add(r); }