User: sgrimstad Date: 09 Oct 18 18:10 Revision: 4386ea363b1735ec4cf177b6b610a7cb39e613f3 Summary: IGNITE-6677 implementation TeamCity URL: https://ci.ignite.apache.org/viewModification.html?tab=vcsModificationFiles&modId=834229&personal=false Index: modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java =================================================================== --- modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -30,16 +30,13 @@ import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.query.FieldsQueryCursor; -import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest; -import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; -import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.SqlClientContext; +import org.apache.ignite.internal.processors.query.TaskSqlFields; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; -import org.jetbrains.annotations.Nullable; /** * Tests for streaming via thin driver. @@ -505,12 +502,10 @@ } /** {@inheritDoc} */ - @Override public List>> querySqlFields(String schemaName, SqlFieldsQuery qry, - @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, - GridQueryCancel cancel) { + @Override public List>> querySqlFields(TaskSqlFields t) { IndexingWithContext.cliCtx = cliCtx; - return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel); + return super.querySqlFields(t); } } -} \ No newline at end of file +} Index: modules/core/pom.xml =================================================================== --- modules/core/pom.xml (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/core/pom.xml (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -120,7 +120,6 @@ com.h2database h2 ${h2.version} - test Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMetricsCollector.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMetricsCollector.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMetricsCollector.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -0,0 +1,126 @@ +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Service helper to handle collecting of metrics. + */ +public interface GridCacheMetricsCollector { + /** + * End point of collecting metrics. + * @return This instance for chain calls. + */ + GridCacheMetricsCollector finish(); + + /** + * Main call for metrics creation. + * @param qryType Query type to save in metric. + * @param qry Query text to save in metric. + * @param qryMgr Query manager to delegate creation of metrics. + * @param e Error to save in metric. + */ + void collectMetrics(GridCacheQueryType qryType, String qry, GridCacheQueryManager qryMgr, Throwable e); + + /** + * + * @return Time in ms when collecting of metrics was started . + */ + long startTime(); + + /** + * + * @return Difference between finish time and start time in ms. + */ + long duration(); + + /** + * Set flag enables call to query manager. + * (is set by default, used for special cases where custom management required. + * @return This instance for chain calls. + */ + GridCacheMetricsCollector arm(); + + class Default implements GridCacheMetricsCollector { + /** */ + private volatile long startTime; + + /** */ + private volatile long finishTime; + + /** */ + private volatile boolean armed; + + /** */ + private AtomicBoolean finished = new AtomicBoolean(false); + + /** + * + * @return Instance with enabled management of call to query manager. + */ + public static GridCacheMetricsCollector createWithTrigger(){ + return new Default(true); + } + + /** + * + * @return Instance without management of call to query manager. + */ + public static GridCacheMetricsCollector create(){ + return new Default(false); + } + + /** + * + * @param wTrigger With or without management of call to query manager. + */ + public Default(boolean wTrigger){ + armed = !wTrigger; + startTime = U.currentTimeMillis(); + } + + /** {@inheritDoc} */ + @Override public GridCacheMetricsCollector finish() { + if (finished.compareAndSet(false, true)) + finishTime = U.currentTimeMillis(); + + return this; + } + + /** {@inheritDoc} */ + @Override public void collectMetrics( + final GridCacheQueryType qryType, + final String qry, + final GridCacheQueryManager qryMgr, + final Throwable e) { + + GridCacheQueryManager qryMgr0 = qryMgr; + + if (!finished.get()) + throw new IgniteException("Unable to collect metrics with metrics collector that is not finished."); + + if (armed) + qryMgr0.collectMetrics(qryType, qry, startTime(), duration(), e != null); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return startTime; + } + + /** {@inheritDoc} */ + @Override public long duration() { + return finishTime - startTime; + } + + /** {@inheritDoc} */ + @Override public GridCacheMetricsCollector arm() { + armed = true; + + return this; + } + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -423,12 +423,12 @@ if (grp != null) qry.projection(grp); - fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, p.getText(), ctx, + fut = ctx.kernalContext().query().executeDefferedQuery(//GridCacheQueryType.TEXT, p.getText(), ctx, new IgniteOutClosureX>>() { @Override public CacheQueryFuture> applyx() { return qry.execute(); } - }, false); + }); } else if (filter instanceof SpiQuery) { qry = ctx.queries().createSpiQuery(isKeepBinary); Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Holds the partition calculation info extracted from a query. + * The query may have several such items associated with it. + * + * The query may contain expressions containing key or affinity key. + * Such expressions can be used as hints to derive small isolated set + * of partitions the query needs to run on. + * + * In case expression contains constant (e.g. _key = 100), the partition + * can be calculated right away and saved into cache along with the query. + * + * In case expression has a parameter (e.g. _key = ?), the effective + * partition varies with each run of the query. Hence, instead of partition, + * one must store the info required to calculate partition. + * + * The given class holds the required info, so that effective partition + * can be calculated during query parameter binding. + */ +public class CacheQueryPartitionInfo { + /** */ + private final int partId; + + /** */ + private final String cacheName; + + /** */ + private final String tableName; + + /** */ + private final int dataType; + + /** */ + private final int paramIdx; + + /** + * @param partId Partition id, or -1 if parameter binding required. + * @param cacheName Cache name required for partition calculation. + * @param tableName Table name required for proper type conversion. + * @param dataType Required data type id for the query parameter. + * @param paramIdx Query parameter index required for partition calculation. + */ + public CacheQueryPartitionInfo(int partId, String cacheName, String tableName, int dataType, int paramIdx) { + // In case partition is not known, both cacheName and tableName must be provided. + assert (partId >= 0) ^ ((cacheName != null) && (tableName != null)); + + this.partId = partId; + this.cacheName = cacheName; + this.tableName = tableName; + this.dataType = dataType; + this.paramIdx = paramIdx; + } + + /** + * @return Partition id, or -1 if parameter binding is required to calculate partition. + */ + public int partition() { + return partId; + } + + /** + * @return Cache name required for partition calculation. + */ + public String cacheName() { + return cacheName; + } + + /** + * @return Table name. + */ + public String tableName() { + return tableName; + } + + /** + * @return Required data type for the query parameter. + */ + public int dataType() { + return dataType; + } + + /** + * @return Query parameter index required for partition calculation. + */ + public int paramIdx() { + return paramIdx; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return partId ^ dataType ^ paramIdx ^ + (cacheName == null ? 0 : cacheName.hashCode()) ^ + (tableName == null ? 0 : tableName.hashCode()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + + if (!(obj instanceof CacheQueryPartitionInfo)) + return false; + + CacheQueryPartitionInfo other = (CacheQueryPartitionInfo)obj; + + if (partId >= 0) + return partId == other.partId; + + if (other.cacheName == null || other.tableName == null) + return false; + + return other.cacheName.equals(cacheName) && + other.tableName.equals(tableName) && + other.dataType == dataType && + other.paramIdx == paramIdx; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheQueryPartitionInfo.class, this); + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Two step map-reduce style query. + */ +public class GridCacheTwoStepQuery { + /** */ + public static final int DFLT_PAGE_SIZE = 1000; + + /** */ + @GridToStringInclude + private List mapQrys = new ArrayList<>(); + + /** */ + @GridToStringInclude + private GridCacheSqlQuery rdc; + + /** */ + private int pageSize = DFLT_PAGE_SIZE; + + /** */ + private boolean explain; + + /** */ + private String originalSql; + + /** */ + private Set tbls; + + /** */ + private boolean distributedJoins; + + /** */ + private boolean skipMergeTbl; + + /** */ + private List cacheIds; + + /** */ + private boolean local; + + /** */ + private CacheQueryPartitionInfo[] derivedPartitions; + + /** */ + private boolean mvccEnabled; + + /** {@code FOR UPDATE} flag. */ + private boolean forUpdate; + + /** + * @param originalSql Original query SQL. + * @param tbls Tables in query. + */ + public GridCacheTwoStepQuery(String originalSql, Set tbls) { + this.originalSql = originalSql; + this.tbls = tbls; + } + + /** + * Specify if distributed joins are enabled for this query. + * + * @param distributedJoins Distributed joins enabled. + */ + public void distributedJoins(boolean distributedJoins) { + this.distributedJoins = distributedJoins; + } + + /** + * Check if distributed joins are enabled for this query. + * + * @return {@code true} If distributed joins enabled. + */ + public boolean distributedJoins() { + return distributedJoins; + } + + + /** + * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index. + */ + public boolean skipMergeTable() { + return skipMergeTbl; + } + + /** + * @param skipMergeTbl Skip merge table. + */ + public void skipMergeTable(boolean skipMergeTbl) { + this.skipMergeTbl = skipMergeTbl; + } + + /** + * @return If this is explain query. + */ + public boolean explain() { + return explain; + } + + /** + * @param explain If this is explain query. + */ + public void explain(boolean explain) { + this.explain = explain; + } + + /** + * @param pageSize Page size. + */ + public void pageSize(int pageSize) { + this.pageSize = pageSize; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** + * @param qry SQL Query. + */ + public void addMapQuery(GridCacheSqlQuery qry) { + mapQrys.add(qry); + } + + /** + * @return {@code true} If all the map queries contain only replicated tables. + */ + public boolean isReplicatedOnly() { + assert !mapQrys.isEmpty(); + + for (GridCacheSqlQuery mapQry : mapQrys) { + if (mapQry.isPartitioned()) + return false; + } + + return true; + } + + /** + * @return Reduce query. + */ + public GridCacheSqlQuery reduceQuery() { + return rdc; + } + + /** + * @param rdc Reduce query. + */ + public void reduceQuery(GridCacheSqlQuery rdc) { + this.rdc = rdc; + } + + /** + * @return Map queries. + */ + public List mapQueries() { + return mapQrys; + } + + /** + * @return Cache IDs. + */ + public List cacheIds() { + return cacheIds; + } + + /** + * @param cacheIds Cache IDs. + */ + public void cacheIds(List cacheIds) { + this.cacheIds = cacheIds; + } + + /** + * @return Original query SQL. + */ + public String originalSql() { + return originalSql; + } + + /** + * @return {@code True} If query is local. + */ + public boolean isLocal() { + return local; + } + + /** + * @param local Local query flag. + */ + public void local(boolean local) { + this.local = local; + } + + /** + * @return Query derived partitions info. + */ + public CacheQueryPartitionInfo[] derivedPartitions() { + return this.derivedPartitions; + } + + /** + * @param derivedPartitions Query derived partitions info. + */ + public void derivedPartitions(CacheQueryPartitionInfo[] derivedPartitions) { + this.derivedPartitions = derivedPartitions; + } + + /** + * @return Copy. + */ + public GridCacheTwoStepQuery copy() { + assert !explain; + + GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, tbls); + + cp.cacheIds = cacheIds; + cp.rdc = rdc.copy(); + cp.skipMergeTbl = skipMergeTbl; + cp.pageSize = pageSize; + cp.distributedJoins = distributedJoins; + cp.derivedPartitions = derivedPartitions; + cp.local = local; + cp.mvccEnabled = mvccEnabled; + cp.forUpdate = forUpdate; + + for (int i = 0; i < mapQrys.size(); i++) + cp.mapQrys.add(mapQrys.get(i).copy()); + + return cp; + } + + /** + * @return Nuumber of tables. + */ + public int tablesCount() { + return tbls.size(); + } + + /** + * @return Tables. + */ + public Set tables() { + return tbls; + } + + /** + * @return Mvcc flag. + */ + public boolean mvccEnabled() { + return mvccEnabled; + } + + /** + * @param mvccEnabled Mvcc flag. + */ + public void mvccEnabled(boolean mvccEnabled) { + this.mvccEnabled = mvccEnabled; + } + + /** + * @return {@code FOR UPDATE} flag. + */ + public boolean forUpdate() { + return forUpdate; + } + + /** + * @param forUpdate {@code FOR UPDATE} flag. + */ + public void forUpdate(boolean forUpdate) { + this.forUpdate = forUpdate; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheTwoStepQuery.class, this); + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; + +/** + * Query table descriptor. + */ +public class QueryTable implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Schema. */ + private String schema; + + /** Table. */ + private String tbl; + + /** + * Defalt constructor. + */ + public QueryTable() { + // No-op. + } + + /** + * Constructor. + * + * @param schema Schema. + * @param tbl Table. + */ + public QueryTable(String schema, String tbl) { + this.schema = schema; + this.tbl = tbl; + } + + /** + * @return Schema. + */ + public String schema() { + return schema; + } + + /** + * @return Table. + */ + public String table() { + return tbl; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeString("schema", schema)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeString("tbl", tbl)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + schema = reader.readString("schema"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + tbl = reader.readString("tbl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(QueryTable.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -54; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * (schema != null ? schema.hashCode() : 0) + (tbl != null ? tbl.hashCode() : 0); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj instanceof QueryTable) { + QueryTable other = (QueryTable)obj; + + return F.eq(tbl, other.tbl) && F.eq(schema, other.schema); + } + + return super.equals(obj); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTable.class, this); + } +} Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -83,16 +83,10 @@ /** * Detect whether SQL query should be executed in distributed or local manner and execute it. - * @param schemaName Schema name. - * @param qry Query. - * @param cliCtx Client context. - * @param keepBinary Keep binary flag. - * @param failOnMultipleStmts Whether an exception should be thrown for multiple statements query. - * @param tracker Query tracker. + * @param task Holder of parameters * @return Cursor. */ - public List>> querySqlFields(String schemaName, SqlFieldsQuery qry, - SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, GridQueryCancel cancel); + public List>> querySqlFields(TaskSqlFields task); /** * Execute an INSERT statement using data streamer as receiver. @@ -135,15 +129,10 @@ /** * Queries individual fields (generally used by JDBC drivers). * - * @param schemaName Schema name. - * @param qry Query. - * @param keepBinary Keep binary flag. - * @param filter Cache name and key filter. - * @param cancel Query cancel. + * @param task Holder of parameters * @return Cursor. */ - public FieldsQueryCursor> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException; + public FieldsQueryCursor> queryLocalSqlFields(TaskSqlFields task) throws IgniteCheckedException; /** * Executes text query. Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMetricsCollector; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; @@ -122,7 +123,6 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA; import static org.apache.ignite.internal.IgniteComponentType.INDEXING; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL; -import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; /** * Indexing processor. @@ -914,7 +914,7 @@ if (!dscoMsgIdHist.add(id)) { U.warn(log, "Received duplicate schema custom discovery message (will ignore) [opId=" + - msg.operation().id() + ", msg=" + msg +']'); + msg.operation().id() + ", msg=" + msg + ']'); return; } @@ -933,7 +933,7 @@ } else U.warn(log, "Received unsupported schema custom discovery message (will ignore) [opId=" + - msg.operation().id() + ", msg=" + msg +']'); + msg.operation().id() + ", msg=" + msg + ']'); } /** @@ -1555,7 +1555,7 @@ } if (!res && !ifNotExists) - throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_EXISTS, entity.getTableName()); + throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_EXISTS, entity.getTableName()); } /** @@ -2133,20 +2133,29 @@ try { IgniteOutClosureX>>> clo = new IgniteOutClosureX>>>() { - @Override public List>> applyx() throws IgniteCheckedException { - GridQueryCancel cancel = new GridQueryCancel(); + @Override public List>> applyx() throws IgniteCheckedException { + GridQueryCancel cancel = new GridQueryCancel(); - List>> res = - idx.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, null, cancel); + List>> res = idx.querySqlFields(TaskSqlFields + .create() + .addSchemaName(schemaName) + .addSqlFieldsQuery(qry) + .addSqlClientContext(cliCtx) + .addKeepBinary(keepBinary) + .addFailOnMultipleStmts(failOnMultipleStmts) + .addGridQueryCancel(cancel) + .addQuerySubmitted(qry.getSql()) + .addQueryTypeSubmitted(GridCacheQueryType.SQL_FIELDS) + ); - if (cctx != null) - sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx); + if (cctx != null) + sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx); - return res; - } - }; + return res; + } + }; - return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), cctx, clo, true); + return executeDefferedQuery(clo); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -2210,7 +2219,7 @@ @Override public Long applyx() throws IgniteCheckedException { return idx.streamUpdateQuery(schemaName, qry, args, streamer); } - }, true); + }, false); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -2237,7 +2246,7 @@ @Override public List applyx() throws IgniteCheckedException { return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx); } - }, true); + }, false); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -2293,7 +2302,7 @@ @Override public QueryCursor> applyx() throws IgniteCheckedException { return idx.queryDistributedSql(schemaName, cctx.name(), qry, keepBinary); } - }, true); + }, false); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -2340,7 +2349,7 @@ return idx.queryLocalSql(schemaName, cctx.name(), qry, idx.backupFilter(requestTopVer.get(), qry.getPartitions()), keepBinary); } - }, true); + },true); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -2525,15 +2534,15 @@ for (QueryField col : cols) { try { props.add(new QueryBinaryProperty( - ctx, + ctx, col.name(), - null, - Class.forName(col.typeName()), - false, - null, - !col.isNullable(), - null, - col.precision(), + null, + Class.forName(col.typeName()), + false, + null, + !col.isNullable(), + null, + col.precision(), col.scale())); } catch (ClassNotFoundException e) { @@ -2598,7 +2607,7 @@ if (desc == null) return; - idx.remove(cctx, desc, row); + idx.remove(cctx, desc, row); } finally { busyLock.leaveBusy(); @@ -2702,7 +2711,7 @@ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public R executeQuery(GridCacheQueryType qryType, String qry, @Nullable GridCacheContext cctx, IgniteOutClosureX clo, boolean complete) throws IgniteCheckedException { - final long startTime = U.currentTimeMillis(); + final GridCacheMetricsCollector metrCol = GridCacheMetricsCollector.Default.create(); Throwable err = null; @@ -2735,23 +2744,53 @@ throw new IgniteCheckedException(e); } finally { - if (!SQL_FIELDS.equals(qryType)) { - boolean failed = err != null; + boolean failed = err != null; - long duration = U.currentTimeMillis() - startTime; - - if (complete || failed) { + if (complete || failed) { - if (cctx != null) - cctx.queries().collectMetrics(qryType, qry, startTime, duration, failed); + metrCol.finish(); + assert cctx != null; + + metrCol.collectMetrics(qryType, qry, cctx.queries(), err); + - if (log.isTraceEnabled()) + if (log.isTraceEnabled()) - log.trace("Query execution [startTime=" + startTime + ", duration=" + duration + - ", fail=" + failed + ", res=" + res + ']'); + log.trace(String.format("Query execution [startTime=%d, duration=%d, fail=%s, res=%s]", + metrCol.startTime(), + metrCol.duration(), + failed, + res + )); - } - } - } + } + } + } + + /** + * @param clo Closure. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public R executeDefferedQuery(IgniteOutClosureX clo) throws IgniteCheckedException { + try { + R res = clo.apply(); + + if (res instanceof CacheQueryFuture) { + CacheQueryFuture fut = (CacheQueryFuture)res; + + fut.error(); - } + } + return res; + } + catch (GridClosureException e) { + throw (IgniteCheckedException)(e.unwrap()); + } + catch (CacheException | IgniteException e) { + throw e; + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + /** * Send status message to coordinator node. * @@ -2892,7 +2931,6 @@ } } - /** * @return Value object context. */ @@ -3075,10 +3113,10 @@ // thread under load. Hence, moving short-lived operation to separate worker. new IgniteThread(ctx.igniteInstanceName(), "schema-circuit-breaker-" + op.id(), new Runnable() { - @Override public void run() { - onSchemaPropose(nextOp.proposeMessage()); - } - }).start(); + @Override public void run() { + onSchemaPropose(nextOp.proposeMessage()); + } + }).start(); } } } Index: modules/core/src/main/java/org/apache/ignite/internal/processors/query/TaskSqlFields.java =================================================================== --- modules/core/src/main/java/org/apache/ignite/internal/processors/query/TaskSqlFields.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) +++ modules/core/src/main/java/org/apache/ignite/internal/processors/query/TaskSqlFields.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -0,0 +1,306 @@ +package org.apache.ignite.internal.processors.query; + +import java.util.List; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; +import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.h2.command.Prepared; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; + +/** + * Holder for all changing set of parameters passed for executing queries SQL fields + */ +public class TaskSqlFields { + /** */ + private String schemaName; + + /** */ + private SqlFieldsQuery qry; + + /** */ + private SqlClientContext cliCtx; + + /** */ + private boolean keepBinary; + + /** */ + private boolean failOnMultipleStmts; + + /** */ + private MvccQueryTracker tracker; + + /** */ + private GridQueryCancel cancel; + + /** */ + private boolean startTx; + + /** */ + private List meta; + + /** */ + private Prepared preparedCmd; + + /** */ + private GridCacheTwoStepQuery twoStepQry; + + /** */ + private IndexingQueryFilter filter; + + /** */ + private int[] cacheIds = new int[0]; + + /** */ + private String qrySubmitted; + + /** */ + private GridCacheQueryType qryTypeSubmitted; + + /** + * + * @return New instance. + */ + public static TaskSqlFields create() { + return new TaskSqlFields(); + } + + /** */ + private TaskSqlFields() { + } + + /** + * + * @param schemaName Schema name. + * @return This instance for chain calls. + */ + public TaskSqlFields addSchemaName(String schemaName) { + this.schemaName = schemaName; + + return this; + } + + /** + * + * @param qry Query. + * @return This instance for chain calls. + */ + public TaskSqlFields addSqlFieldsQuery(SqlFieldsQuery qry) { + this.qry = qry; + + return this; + } + + /** + * + * @param cliCtx Client context. + * @return This instance for chain calls. + */ + public TaskSqlFields addSqlClientContext(SqlClientContext cliCtx) { + this.cliCtx = cliCtx; + + return this; + } + + /** + * + * @param keepBinary If to keep binary. + * @return This instance for chain calls. + */ + public TaskSqlFields addKeepBinary(boolean keepBinary) { + this.keepBinary = keepBinary; + + return this; + } + + /** + * + * @param failOnMultipleStmts If to fail on multiple statements. + * @return This instance for chain calls. + */ + public TaskSqlFields addFailOnMultipleStmts(boolean failOnMultipleStmts) { + this.failOnMultipleStmts = failOnMultipleStmts; + + return this; + } + + /** + * + * @param tracker Mvcc query tracker. + * @return This instance for chain calls. + */ + public TaskSqlFields addMvccQueryTracker(MvccQueryTracker tracker) { + this.tracker = tracker; + + return this; + } + + /** + * + * @param cancel Query cancel. + * @return This instance for chain calls. + */ + public TaskSqlFields addGridQueryCancel(GridQueryCancel cancel) { + this.cancel = cancel; + + return this; + } + + /** + * + * @param startTx If to start transaction. + * @return This instance for chain calls. + */ + public TaskSqlFields addStartTx(boolean startTx) { + this.startTx = startTx; + + return this; + } + + /** + * + * @param meta List of mete data. + * @return This instance for chain calls. + */ + public TaskSqlFields addMeta(List meta) { + this.meta = meta; + + return this; + } + + /** + * + * @param preparedCmd Prepared command. + * @return This instance for chain calls. + */ + public TaskSqlFields addPreparedCmd(Prepared preparedCmd) { + this.preparedCmd = preparedCmd; + + return this; + } + + /** + * + * @param twoStepQry Two step query. + * @return This instance for chain calls. + */ + public TaskSqlFields addTwoStepQry(GridCacheTwoStepQuery twoStepQry) { + this.twoStepQry = twoStepQry; + + return this; + } + + public TaskSqlFields addFilter(IndexingQueryFilter filter) { + this.filter = filter; + + return this; + } + + /** + * + * @param cacheIds Array of cache ids. + * @return This instance for chain calls. + */ + public TaskSqlFields addCacheIds(int[] cacheIds) { + this.cacheIds = cacheIds; + + return this; + } + + /** + * + * @param qrySubmitted Text of query that was actually submitted. + * @return This instance for chain calls. + */ + public TaskSqlFields addQuerySubmitted(String qrySubmitted) { + this.qrySubmitted = qrySubmitted; + + return this; + } + + /** + * + * @param qryTypeSubmitted Text of query that was actually submitted. + * @return This instance for chain calls. + */ + public TaskSqlFields addQueryTypeSubmitted(GridCacheQueryType qryTypeSubmitted) { + this.qryTypeSubmitted = qryTypeSubmitted; + + return this; + } + + /** */ + public String schemaName() { + return schemaName; + } + + /** */ + public SqlFieldsQuery qry() { + return qry; + } + + /** */ + public SqlClientContext cliCtx() { + return cliCtx; + } + + /** */ + public boolean keepBinary() { + return keepBinary; + } + + /** */ + public boolean failOnMultipleStmts() { + return failOnMultipleStmts; + } + + /** */ + public MvccQueryTracker tracker() { + return tracker; + } + + /** */ + public GridQueryCancel cancel() { + return cancel; + } + + /** */ + public boolean startTx() { + return startTx; + } + + /** */ + public List meta() { + return meta; + } + + /** */ + public Prepared preparedCommand() { + return preparedCmd; + } + + /** */ + public GridCacheTwoStepQuery twoStepQuery() { + return twoStepQry; + } + + /** */ + public IndexingQueryFilter filter() { + return filter; + } + + /** */ + public int[] cacheIds() { + return cacheIds; + } + + /** */ + public String querySubmitted() { + return qrySubmitted; + } + + /** */ + public GridCacheQueryType queryTypeSubmitted() { + return qryTypeSubmitted; + } +} Index: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java =================================================================== --- modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.query.QueryField; import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; import org.apache.ignite.internal.processors.query.SqlClientContext; +import org.apache.ignite.internal.processors.query.TaskSqlFields; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -247,8 +248,7 @@ } /** {@inheritDoc} */ - @Override public List>> querySqlFields(String schemaName, SqlFieldsQuery qry, - SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, GridQueryCancel cancel) { + @Override public List>> querySqlFields(TaskSqlFields tsk) { return null; } @@ -271,8 +271,7 @@ } /** {@inheritDoc} */ - @Override public FieldsQueryCursor> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException { + @Override public FieldsQueryCursor> queryLocalSqlFields(TaskSqlFields t) throws IgniteCheckedException { return null; } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query; - -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Holds the partition calculation info extracted from a query. - * The query may have several such items associated with it. - * - * The query may contain expressions containing key or affinity key. - * Such expressions can be used as hints to derive small isolated set - * of partitions the query needs to run on. - * - * In case expression contains constant (e.g. _key = 100), the partition - * can be calculated right away and saved into cache along with the query. - * - * In case expression has a parameter (e.g. _key = ?), the effective - * partition varies with each run of the query. Hence, instead of partition, - * one must store the info required to calculate partition. - * - * The given class holds the required info, so that effective partition - * can be calculated during query parameter binding. - */ -public class CacheQueryPartitionInfo { - /** */ - private final int partId; - - /** */ - private final String cacheName; - - /** */ - private final String tableName; - - /** */ - private final int dataType; - - /** */ - private final int paramIdx; - - /** - * @param partId Partition id, or -1 if parameter binding required. - * @param cacheName Cache name required for partition calculation. - * @param tableName Table name required for proper type conversion. - * @param dataType Required data type id for the query parameter. - * @param paramIdx Query parameter index required for partition calculation. - */ - public CacheQueryPartitionInfo(int partId, String cacheName, String tableName, int dataType, int paramIdx) { - // In case partition is not known, both cacheName and tableName must be provided. - assert (partId >= 0) ^ ((cacheName != null) && (tableName != null)); - - this.partId = partId; - this.cacheName = cacheName; - this.tableName = tableName; - this.dataType = dataType; - this.paramIdx = paramIdx; - } - - /** - * @return Partition id, or -1 if parameter binding is required to calculate partition. - */ - public int partition() { - return partId; - } - - /** - * @return Cache name required for partition calculation. - */ - public String cacheName() { - return cacheName; - } - - /** - * @return Table name. - */ - public String tableName() { - return tableName; - } - - /** - * @return Required data type for the query parameter. - */ - public int dataType() { - return dataType; - } - - /** - * @return Query parameter index required for partition calculation. - */ - public int paramIdx() { - return paramIdx; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return partId ^ dataType ^ paramIdx ^ - (cacheName == null ? 0 : cacheName.hashCode()) ^ - (tableName == null ? 0 : tableName.hashCode()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean equals(Object obj) { - if (this == obj) - return true; - - if (!(obj instanceof CacheQueryPartitionInfo)) - return false; - - CacheQueryPartitionInfo other = (CacheQueryPartitionInfo)obj; - - if (partId >= 0) - return partId == other.partId; - - if (other.cacheName == null || other.tableName == null) - return false; - - return other.cacheName.equals(cacheName) && - other.tableName.equals(tableName) && - other.dataType == dataType && - other.paramIdx == paramIdx; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheQueryPartitionInfo.class, this); - } -} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Two step map-reduce style query. - */ -public class GridCacheTwoStepQuery { - /** */ - public static final int DFLT_PAGE_SIZE = 1000; - - /** */ - @GridToStringInclude - private List mapQrys = new ArrayList<>(); - - /** */ - @GridToStringInclude - private GridCacheSqlQuery rdc; - - /** */ - private int pageSize = DFLT_PAGE_SIZE; - - /** */ - private boolean explain; - - /** */ - private String originalSql; - - /** */ - private Set tbls; - - /** */ - private boolean distributedJoins; - - /** */ - private boolean skipMergeTbl; - - /** */ - private List cacheIds; - - /** */ - private boolean local; - - /** */ - private CacheQueryPartitionInfo[] derivedPartitions; - - /** */ - private boolean mvccEnabled; - - /** {@code FOR UPDATE} flag. */ - private boolean forUpdate; - - /** - * @param originalSql Original query SQL. - * @param tbls Tables in query. - */ - public GridCacheTwoStepQuery(String originalSql, Set tbls) { - this.originalSql = originalSql; - this.tbls = tbls; - } - - /** - * Specify if distributed joins are enabled for this query. - * - * @param distributedJoins Distributed joins enabled. - */ - public void distributedJoins(boolean distributedJoins) { - this.distributedJoins = distributedJoins; - } - - /** - * Check if distributed joins are enabled for this query. - * - * @return {@code true} If distributed joins enabled. - */ - public boolean distributedJoins() { - return distributedJoins; - } - - - /** - * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index. - */ - public boolean skipMergeTable() { - return skipMergeTbl; - } - - /** - * @param skipMergeTbl Skip merge table. - */ - public void skipMergeTable(boolean skipMergeTbl) { - this.skipMergeTbl = skipMergeTbl; - } - - /** - * @return If this is explain query. - */ - public boolean explain() { - return explain; - } - - /** - * @param explain If this is explain query. - */ - public void explain(boolean explain) { - this.explain = explain; - } - - /** - * @param pageSize Page size. - */ - public void pageSize(int pageSize) { - this.pageSize = pageSize; - } - - /** - * @return Page size. - */ - public int pageSize() { - return pageSize; - } - - /** - * @param qry SQL Query. - */ - public void addMapQuery(GridCacheSqlQuery qry) { - mapQrys.add(qry); - } - - /** - * @return {@code true} If all the map queries contain only replicated tables. - */ - public boolean isReplicatedOnly() { - assert !mapQrys.isEmpty(); - - for (GridCacheSqlQuery mapQry : mapQrys) { - if (mapQry.isPartitioned()) - return false; - } - - return true; - } - - /** - * @return Reduce query. - */ - public GridCacheSqlQuery reduceQuery() { - return rdc; - } - - /** - * @param rdc Reduce query. - */ - public void reduceQuery(GridCacheSqlQuery rdc) { - this.rdc = rdc; - } - - /** - * @return Map queries. - */ - public List mapQueries() { - return mapQrys; - } - - /** - * @return Cache IDs. - */ - public List cacheIds() { - return cacheIds; - } - - /** - * @param cacheIds Cache IDs. - */ - public void cacheIds(List cacheIds) { - this.cacheIds = cacheIds; - } - - /** - * @return Original query SQL. - */ - public String originalSql() { - return originalSql; - } - - /** - * @return {@code True} If query is local. - */ - public boolean isLocal() { - return local; - } - - /** - * @param local Local query flag. - */ - public void local(boolean local) { - this.local = local; - } - - /** - * @return Query derived partitions info. - */ - public CacheQueryPartitionInfo[] derivedPartitions() { - return this.derivedPartitions; - } - - /** - * @param derivedPartitions Query derived partitions info. - */ - public void derivedPartitions(CacheQueryPartitionInfo[] derivedPartitions) { - this.derivedPartitions = derivedPartitions; - } - - /** - * @return Copy. - */ - public GridCacheTwoStepQuery copy() { - assert !explain; - - GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, tbls); - - cp.cacheIds = cacheIds; - cp.rdc = rdc.copy(); - cp.skipMergeTbl = skipMergeTbl; - cp.pageSize = pageSize; - cp.distributedJoins = distributedJoins; - cp.derivedPartitions = derivedPartitions; - cp.local = local; - cp.mvccEnabled = mvccEnabled; - cp.forUpdate = forUpdate; - - for (int i = 0; i < mapQrys.size(); i++) - cp.mapQrys.add(mapQrys.get(i).copy()); - - return cp; - } - - /** - * @return Nuumber of tables. - */ - public int tablesCount() { - return tbls.size(); - } - - /** - * @return Tables. - */ - public Set tables() { - return tbls; - } - - /** - * @return Mvcc flag. - */ - public boolean mvccEnabled() { - return mvccEnabled; - } - - /** - * @param mvccEnabled Mvcc flag. - */ - public void mvccEnabled(boolean mvccEnabled) { - this.mvccEnabled = mvccEnabled; - } - - /** - * @return {@code FOR UPDATE} flag. - */ - public boolean forUpdate() { - return forUpdate; - } - - /** - * @param forUpdate {@code FOR UPDATE} flag. - */ - public void forUpdate(boolean forUpdate) { - this.forUpdate = forUpdate; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheTwoStepQuery.class, this); - } -} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query; - -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -import java.nio.ByteBuffer; - -/** - * Query table descriptor. - */ -public class QueryTable implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Schema. */ - private String schema; - - /** Table. */ - private String tbl; - - /** - * Defalt constructor. - */ - public QueryTable() { - // No-op. - } - - /** - * Constructor. - * - * @param schema Schema. - * @param tbl Table. - */ - public QueryTable(String schema, String tbl) { - this.schema = schema; - this.tbl = tbl; - } - - /** - * @return Schema. - */ - public String schema() { - return schema; - } - - /** - * @return Table. - */ - public String table() { - return tbl; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeString("schema", schema)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeString("tbl", tbl)) - return false; - - writer.incrementState(); - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - schema = reader.readString("schema"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - tbl = reader.readString("tbl"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } - - return reader.afterMessageRead(QueryTable.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return -54; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return 31 * (schema != null ? schema.hashCode() : 0) + (tbl != null ? tbl.hashCode() : 0); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (obj instanceof QueryTable) { - QueryTable other = (QueryTable)obj; - - return F.eq(tbl, other.tbl) && F.eq(schema, other.schema); - } - - return super.equals(obj); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(QueryTable.class, this); - } -} Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -52,10 +52,12 @@ import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMetricsCollector; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.odbc.SqlStateCode; @@ -66,6 +68,7 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.TaskSqlFields; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender; import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo; @@ -426,9 +429,18 @@ Iterator> it; if (!F.isEmpty(plan.selectQuery())) { - GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), - plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), - null, false, false, 0, null); + GridQueryFieldsResult res = idx.queryLocalSqlFields( + new int[0], + idx.schema(cctx.name()), + plan.selectQuery(), + F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)), + null, + false, + false, + 0, + null, + null + ); it = res.iterator(); } @@ -488,170 +500,219 @@ private UpdateResult executeUpdateStatement(String schemaName, final UpdatePlan plan, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { + GridCacheMetricsCollector metrCol = GridCacheMetricsCollector.Default.createWithTrigger(); + GridCacheContext cctx = plan.cacheContext(); + try { - if (cctx != null && cctx.mvccEnabled()) { - assert cctx.transactional(); + if (cctx != null && cctx.mvccEnabled()) { + assert cctx.transactional(); - DmlDistributedPlanInfo distributedPlan = plan.distributedPlan(); + DmlDistributedPlanInfo distributedPlan = plan.distributedPlan(); - GridNearTxLocal tx = tx(cctx.kernalContext()); + GridNearTxLocal tx = tx(cctx.kernalContext()); - boolean implicit = (tx == null); + boolean implicit = (tx == null); - boolean commit = implicit && (!(fieldsQry instanceof SqlFieldsQueryEx) || - ((SqlFieldsQueryEx)fieldsQry).isAutoCommit()); + boolean commit = implicit && (!(fieldsQry instanceof SqlFieldsQueryEx) || + ((SqlFieldsQueryEx)fieldsQry).isAutoCommit()); - if (implicit) - tx = txStart(cctx, fieldsQry.getTimeout()); + if (implicit) + tx = txStart(cctx, fieldsQry.getTimeout()); - requestSnapshot(cctx, checkActive(tx)); + requestSnapshot(cctx, checkActive(tx)); - try (GridNearTxLocal toCommit = commit ? tx : null) { - long timeout = implicit - ? tx.remainingTime() - : IgniteH2Indexing.operationTimeout(fieldsQry.getTimeout(), tx); + try (GridNearTxLocal toCommit = commit ? tx : null) { + long timeout = implicit + ? tx.remainingTime() + : IgniteH2Indexing.operationTimeout(fieldsQry.getTimeout(), tx); - if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT - || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) { + if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT + || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) { - boolean sequential = true; + boolean sequential = true; - UpdateSourceIterator it; + UpdateSourceIterator it; - if (plan.fastResult()) { + if (plan.fastResult()) { + metrCol.arm(); + - IgniteBiTuple row = plan.getFastRow(fieldsQry.getArgs()); + IgniteBiTuple row = plan.getFastRow(fieldsQry.getArgs()); - EnlistOperation op = UpdatePlan.enlistOperation(plan.mode()); + EnlistOperation op = UpdatePlan.enlistOperation(plan.mode()); - it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row); - } + it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row); + } - else if (plan.hasRows()) + else if (plan.hasRows()) { + metrCol.arm(); + - it = new DmlUpdateResultsIterator(UpdatePlan.enlistOperation(plan.mode()), plan, plan.createRows(fieldsQry.getArgs())); + it = new DmlUpdateResultsIterator(UpdatePlan.enlistOperation(plan.mode()), plan, plan.createRows(fieldsQry.getArgs())); + } - else { - // TODO IGNITE-8865 if there is no ORDER BY statement it's no use to retain entries order on locking (sequential = false). - SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated()) - .setArgs(fieldsQry.getArgs()) - .setDistributedJoins(fieldsQry.isDistributedJoins()) - .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()) - .setLocal(fieldsQry.isLocal()) - .setPageSize(fieldsQry.getPageSize()) - .setTimeout((int)timeout, TimeUnit.MILLISECONDS); + else { + // TODO IGNITE-8865 if there is no ORDER BY statement it's no use to retain entries order on locking (sequential = false). + SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated()) + .setArgs(fieldsQry.getArgs()) + .setDistributedJoins(fieldsQry.isDistributedJoins()) + .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()) + .setLocal(fieldsQry.isLocal()) + .setPageSize(fieldsQry.getPageSize()) + .setTimeout((int)timeout, TimeUnit.MILLISECONDS); - FieldsQueryCursor> cur = idx.querySqlFields(schemaName, newFieldsQry, null, - true, true, mvccTracker(cctx, tx), cancel).get(0); + FieldsQueryCursor> cur = idx.querySqlFields( + TaskSqlFields + .create() + .addSchemaName(schemaName) + .addSqlFieldsQuery(newFieldsQry) + .addKeepBinary(true) + .addFailOnMultipleStmts(true) + .addMvccQueryTracker(mvccTracker(cctx, tx)) + .addGridQueryCancel(cancel) + .addQuerySubmitted(fieldsQry.getSql()) + .addQueryTypeSubmitted(GridCacheQueryType.SQL_FIELDS) + ).get(0); - it = plan.iteratorForTransaction(idx, cur); - } + it = plan.iteratorForTransaction(idx, cur); + } - IgniteInternalFuture fut = tx.updateAsync(cctx, it, - fieldsQry.getPageSize(), timeout, sequential); + IgniteInternalFuture fut = tx.updateAsync(cctx, it, + fieldsQry.getPageSize(), timeout, sequential); - UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY); + UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY); - if (commit) - toCommit.commit(); + if (commit) + toCommit.commit(); - return res; - } + return res; + } - int[] ids = U.toIntArray(distributedPlan.getCacheIds()); + int[] ids = U.toIntArray(distributedPlan.getCacheIds()); - int flags = 0; + int flags = 0; - if (fieldsQry.isEnforceJoinOrder()) - flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER; + if (fieldsQry.isEnforceJoinOrder()) + flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER; - if (distributedPlan.isReplicatedOnly()) - flags |= GridH2QueryRequest.FLAG_REPLICATED; + if (distributedPlan.isReplicatedOnly()) + flags |= GridH2QueryRequest.FLAG_REPLICATED; - int[] parts = fieldsQry.getPartitions(); + int[] parts = fieldsQry.getPartitions(); - IgniteInternalFuture fut = tx.updateAsync( - cctx, - ids, - parts, - schemaName, - fieldsQry.getSql(), - fieldsQry.getArgs(), - flags, - fieldsQry.getPageSize(), - timeout); + IgniteInternalFuture fut = tx.updateAsync( + cctx, + ids, + parts, + schemaName, + fieldsQry.getSql(), + fieldsQry.getArgs(), + flags, + fieldsQry.getPageSize(), + timeout); - UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY); + UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY); - if (commit) - toCommit.commit(); + if (commit) + toCommit.commit(); - return res; - } - catch (IgniteCheckedException e) { - checkSqlException(e); + return res; + } + catch (IgniteCheckedException e) { + checkSqlException(e); - U.error(log, "Error during update [localNodeId=" + cctx.localNodeId() + "]", e); + U.error(log, "Error during update [localNodeId=" + cctx.localNodeId() + "]", e); - throw new IgniteSQLException("Failed to run update. " + e.getMessage(), e); - } - finally { - if (commit) - cctx.tm().resetContext(); - } - } + throw new IgniteSQLException("Failed to run update. " + e.getMessage(), e); + } + finally { + if (commit) + cctx.tm().resetContext(); + } + } - UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs()); + UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs()); - if (fastUpdateRes != null) + if (fastUpdateRes != null) { + metrCol.arm(); + - return fastUpdateRes; + return fastUpdateRes; + } - if (plan.distributedPlan() != null) { - UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel); + if (plan.distributedPlan() != null) { + UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel); - // null is returned in case not all nodes support distributed DML. + // null is returned in case not all nodes support distributed DML. - if (result != null) + if (result != null) { + metrCol.arm(); + - return result; - } + return result; + } + } - Iterable> cur; + Iterable> cur; - // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual - // sub-query and not some dummy stuff like "select 1, 2, 3;" - if (!loc && !plan.isLocalSubquery()) { - assert !F.isEmpty(plan.selectQuery()); + // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual + // sub-query and not some dummy stuff like "select 1, 2, 3;" + if (!loc && !plan.isLocalSubquery()) { + assert !F.isEmpty(plan.selectQuery()); - SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated()) - .setArgs(fieldsQry.getArgs()) - .setDistributedJoins(fieldsQry.isDistributedJoins()) - .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()) - .setLocal(fieldsQry.isLocal()) - .setPageSize(fieldsQry.getPageSize()) - .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); + SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated()) + .setArgs(fieldsQry.getArgs()) + .setDistributedJoins(fieldsQry.isDistributedJoins()) + .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()) + .setLocal(fieldsQry.isLocal()) + .setPageSize(fieldsQry.getPageSize()) + .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true, - null, cancel).get(0); + cur = (QueryCursorImpl>)idx.querySqlFields(TaskSqlFields + .create() + .addSchemaName(schemaName) + .addSqlFieldsQuery(newFieldsQry) + .addKeepBinary(true) + .addFailOnMultipleStmts(true) + .addGridQueryCancel(cancel) + .addQuerySubmitted(fieldsQry.getSql()) + .addQueryTypeSubmitted(GridCacheQueryType.SQL_FIELDS) + ).get(0); - } + } - else if (plan.hasRows()) + else if (plan.hasRows()) { + metrCol.arm(); + - cur = plan.createRows(fieldsQry.getArgs()); + cur = plan.createRows(fieldsQry.getArgs()); + } - else { + else { - final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(), - F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), false, fieldsQry.getTimeout(), - cancel); + final GridQueryFieldsResult res = idx.queryLocalSqlFields( + new int[0], + schemaName, + plan.selectQuery(), + F.asList(fieldsQry.getArgs()), + filters, + fieldsQry.isEnforceJoinOrder(), + false, + fieldsQry.getTimeout(), + cancel, + null + ); - cur = new QueryCursorImpl<>(new Iterable>() { - @Override public Iterator> iterator() { - try { - return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), true); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - }, cancel); - } + cur = new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + try { + return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), true); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }, cancel); + } - int pageSize = loc ? 0 : fieldsQry.getPageSize(); + int pageSize = loc ? 0 : fieldsQry.getPageSize(); - return processDmlSelectResult(plan, cur, pageSize); + return processDmlSelectResult(plan, cur, pageSize); + } finally { + metrCol.finish(); + + metrCol.collectMetrics(GridCacheQueryType.SQL_FIELDS, fieldsQry.getSql(), cctx.queries(), null); - } + } + } /** * @param e Exception. @@ -1150,14 +1211,35 @@ .setPageSize(qry.getPageSize()) .setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl>)idx.querySqlFields(schema, newFieldsQry, null, true, true, - new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel).get(0); + cur = (QueryCursorImpl>)idx.querySqlFields(TaskSqlFields + .create() + .addSchemaName(schema) + .addSqlFieldsQuery(newFieldsQry) + .addKeepBinary(true) + .addFailOnMultipleStmts(true) + .addMvccQueryTracker(new StaticMvccQueryTracker(cctx, mvccSnapshot)) + .addGridQueryCancel(cancel) + .addQuerySubmitted(qry.getSql()) + ).get(0); } else { - final GridQueryFieldsResult res = idx.queryLocalSqlFields(schema, plan.selectQuery(), - F.asList(qry.getArgs()), filter, qry.isEnforceJoinOrder(), false, qry.getTimeout(), cancel, - new StaticMvccQueryTracker(cctx, mvccSnapshot)); + int[] cacheIds = new int[1]; + cacheIds[0] = cctx.cacheId(); + + final GridQueryFieldsResult res = idx.queryLocalSqlFields( + cacheIds, + schema, + plan.selectQuery(), + F.asList(qry.getArgs()), + filter, + qry.isEnforceJoinOrder(), + false, + qry.getTimeout(), + cancel, + new StaticMvccQueryTracker(cctx, mvccSnapshot) + ); + cur = new QueryCursorImpl<>(new Iterable>() { @Override public Iterator> iterator() { try { Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMetricsCollector; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; @@ -80,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.QueryTable; @@ -102,6 +104,7 @@ import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.SqlClientContext; +import org.apache.ignite.internal.processors.query.TaskSqlFields; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; @@ -254,7 +257,7 @@ ";ROW_FACTORY=\"" + GridH2PlainRowFactory.class.getName() + "\"" + ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName(); - // Uncomment this setting to get debug output from H2 to sysout. + // Uncomment this setting to get debug output from H2 to sysout. // ";TRACE_LEVEL_SYSTEM_OUT=3"; /** Dummy metadata for update result. */ @@ -686,6 +689,7 @@ /** * Execute statement on H2 INFORMATION_SCHEMA. + * * @param sql SQL statement. */ public void executeSystemStatement(String sql) { @@ -763,7 +767,7 @@ if (tbl == null) return; // Type was rejected. - tbl.table().update(row, prevRow, prevRowAvailable); + tbl.table().update(row, prevRow, prevRowAvailable); if (tbl.luceneIndex() != null) { long expireTime = row.expireTime(); @@ -1036,33 +1040,13 @@ /** * Queries individual fields (generally used by JDBC drivers). * + * @param cacheIds cache ids involved * @param schemaName Schema name. * @param qry Query. * @param params Query parameters. * @param filter Cache name and key filter. * @param enforceJoinOrder Enforce join order of tables in the query. * @param startTx Start transaction flag. - * @param timeout Query timeout in milliseconds. - * @param cancel Query cancel. - * @return Query result. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public GridQueryFieldsResult queryLocalSqlFields(String schemaName, String qry, @Nullable Collection params, - IndexingQueryFilter filter, boolean enforceJoinOrder, boolean startTx, int timeout, - GridQueryCancel cancel) throws IgniteCheckedException { - return queryLocalSqlFields(schemaName, qry, params, filter, enforceJoinOrder, startTx, timeout, cancel, null); - } - - /** - * Queries individual fields (generally used by JDBC drivers). - * - * @param schemaName Schema name. - * @param qry Query. - * @param params Query parameters. - * @param filter Cache name and key filter. - * @param enforceJoinOrder Enforce join order of tables in the query. - * @param startTx Start transaction flag. * @param qryTimeout Query timeout in milliseconds. * @param cancel Query cancel. * @param mvccTracker Query tracker. @@ -1070,11 +1054,18 @@ * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - GridQueryFieldsResult queryLocalSqlFields(final String schemaName, String qry, - @Nullable final Collection params, final IndexingQueryFilter filter, boolean enforceJoinOrder, - boolean startTx, int qryTimeout, final GridQueryCancel cancel, - MvccQueryTracker mvccTracker) throws IgniteCheckedException { - + GridQueryFieldsResult queryLocalSqlFields( + @NotNull final int[] cacheIds, + final String schemaName, + String qry, + @Nullable final Collection params, + final IndexingQueryFilter filter, + boolean enforceJoinOrder, + boolean startTx, + int qryTimeout, + final GridQueryCancel cancel, + MvccQueryTracker mvccTracker + ) throws IgniteCheckedException { GridNearTxLocal tx = null; boolean mvccEnabled = mvccEnabled(kernalContext()); @@ -1109,7 +1100,7 @@ IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } - final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) + final GridH2QueryContext qctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) .filter(filter).distributedJoinMode(OFF); boolean forUpdate = GridSqlQueryParser.isForUpdateQuery(p); @@ -1127,7 +1118,7 @@ mvccTracker = mvccTracker(stmt, startTx); if (mvccTracker != null) { - ctx.mvccSnapshot(mvccTracker.snapshot()); + qctx.mvccSnapshot(mvccTracker.snapshot()); tx = checkActive(tx(this.ctx)); @@ -1189,8 +1180,13 @@ @Override public GridCloseableIterator> iterator() throws IgniteCheckedException { assert GridH2QueryContext.get() == null; - GridH2QueryContext.set(ctx); + GridCacheMetricsCollector metrCol = GridCacheMetricsCollector.Default.create(); + + Throwable error = null; + + GridH2QueryContext.set(qctx); + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry0, SQL_FIELDS, schemaName, U.currentTimeMillis(), cancel, true); @@ -1251,12 +1247,24 @@ e.addSuppressed(e0); } + error = e; + throw e; } finally { GridH2QueryContext.clearThreadLocal(); runs.remove(run.id()); + + metrCol.finish(); + + for (int id : cacheIds) + metrCol.collectMetrics( + SQL_FIELDS, + qry0, + ctx.cache().context().cacheContext(id).queries(), + error + ); } } }; @@ -1529,27 +1537,31 @@ } /** {@inheritDoc} */ - @Override public FieldsQueryCursor> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry, - final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException { - String sql = qry.getSql(); - List params = F.asList(qry.getArgs()); - boolean enforceJoinOrder = qry.isEnforceJoinOrder(), startTx = autoStartTx(qry); - int timeout = qry.getTimeout(); + @Override public FieldsQueryCursor> queryLocalSqlFields(TaskSqlFields t) throws IgniteCheckedException { + final GridQueryFieldsResult res = queryLocalSqlFields( + t.cacheIds(), + t.schemaName(), + t.qry().getSql(), + F.asList(t.qry().getArgs()), + t.filter(), + t.qry().isEnforceJoinOrder(), + autoStartTx(t.qry()), + t.qry().getTimeout(), + t.cancel(), + t.tracker() + ); - final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, params, filter, - enforceJoinOrder, startTx, timeout, cancel); - QueryCursorImpl> cursor = new QueryCursorImpl<>(new Iterable>() { @SuppressWarnings("NullableProblems") @Override public Iterator> iterator() { try { - return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary); + return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), t.keepBinary()); } catch (IgniteCheckedException e) { throw new IgniteException(e); } } - }, cancel); + }, t.cancel()); cursor.fieldsMeta(res.metaData()); @@ -1755,52 +1767,92 @@ } /** - * @param schemaName Schema name. - * @param qry Query. - * @param keepCacheObj Flag to keep cache object. - * @param enforceJoinOrder Enforce join order of tables. - * @param startTx Start transaction flag. - * @param qryTimeout Query timeout. - * @param cancel Cancel object. - * @param params Query parameters. - * @param parts Partitions. - * @param lazy Lazy query execution flag. - * @param mvccTracker Query tracker. - * @return Iterable result. + * Run distributed query on detected set of partitions. + * + * @param t Parameter holder + * @return Cursor representing distributed query result. */ - private Iterable> runQueryTwoStep( - final String schemaName, - final GridCacheTwoStepQuery qry, - final boolean keepCacheObj, - final boolean enforceJoinOrder, - boolean startTx, - final int qryTimeout, - final GridQueryCancel cancel, - final Object[] params, - final int[] parts, - final boolean lazy, - MvccQueryTracker mvccTracker) { - assert !qry.mvccEnabled() || !F.isEmpty(qry.cacheIds()); + private FieldsQueryCursor> runQueryTwoStep(TaskSqlFields t){ + if (log.isDebugEnabled()) + log.debug("Parsed query: `" + t.qry().getSql() + "` into two step query: " + t.twoStepQuery()); + + t.twoStepQuery().pageSize(t.qry().getPageSize()); + + if (t.cancel() == null) + t.addGridQueryCancel(new GridQueryCancel()); + + int parts0[] = t.qry().getPartitions(); + + if (parts0 == null && t.twoStepQuery().derivedPartitions() != null) { - try { + try { - final MvccQueryTracker tracker = mvccTracker == null && qry.mvccEnabled() ? - MvccUtils.mvccTracker(ctx.cache().context().cacheContext(qry.cacheIds().get(0)), startTx) : mvccTracker; + parts0 = calculateQueryPartitions(t.twoStepQuery().derivedPartitions(), t.qry().getArgs()); + } + catch (IgniteCheckedException e) { + throw new CacheException("Failed to calculate derived partitions: [qry=" + t.qry().getSql() + ", params=" + + Arrays.deepToString(t.qry().getArgs()) + "]", e); + } + } + final int parts[] = parts0; + + assert !t.twoStepQuery().mvccEnabled() || !F.isEmpty(t.twoStepQuery().cacheIds()); + + try { + if (t.tracker() == null && t.twoStepQuery().mvccEnabled()) + t.addMvccQueryTracker(MvccUtils.mvccTracker( + ctx.cache().context().cacheContext(t.twoStepQuery().cacheIds().get(0)), t.startTx()) + ); + GridNearTxLocal tx = tx(ctx); - if (qry.forUpdate()) - qry.forUpdate(checkActive(tx) != null); + if (t.twoStepQuery().forUpdate()) + t.twoStepQuery().forUpdate(checkActive(tx) != null); - int opTimeout = operationTimeout(qryTimeout, tx); + int opTimeout = operationTimeout(t.qry().getTimeout(), tx); - return new Iterable>() { + QueryCursorImpl> cursor = new QueryCursorImpl<>( + new Iterable>() { @SuppressWarnings("NullableProblems") @Override public Iterator> iterator() { - return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, opTimeout, - cancel, params, parts, lazy, tracker); + Throwable err = null; + + final GridCacheMetricsCollector metrCol = GridCacheMetricsCollector.Default.create(); + + try { + return rdcQryExec.query( + t.schemaName(), + t.twoStepQuery(), + t.keepBinary(), + t.qry().isEnforceJoinOrder(), + opTimeout, + t.cancel(), + t.qry().getArgs(), + parts, + t.qry().isLazy(), + t.tracker() + ); + }catch (Throwable e){ + err = e; + throw e; + }finally { + metrCol.finish(); + + for(int id : t.cacheIds()) + metrCol.collectMetrics( + t.queryTypeSubmitted(), + t.querySubmitted(), + ctx.cache().context().cacheContext(id).queries(), + err + ); - } + } - }; - } + } + },t.cancel()); + + cursor.fieldsMeta(t.meta()); + + return cursor; + } catch (IgniteCheckedException e) { throw new CacheException(e); } @@ -1859,8 +1911,15 @@ if (qry.getTimeout() > 0) fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); - final QueryCursor> res = - querySqlFields(schemaName, fqry, null, keepBinary, true, null, null).get(0); + final QueryCursor> res = querySqlFields(TaskSqlFields + .create() + .addSchemaName(schemaName) + .addSqlFieldsQuery(fqry) + .addKeepBinary(keepBinary) + .addFailOnMultipleStmts(true) + .addQuerySubmitted(qry.getSql()) + .addQueryTypeSubmitted(GridCacheQueryType.SQL) + ).get(0); final Iterable> converted = new Iterable>() { @Override public Iterator> iterator() { @@ -2106,40 +2165,46 @@ /** {@inheritDoc} */ @SuppressWarnings({"StringEquality", "unchecked"}) - @Override public List>> querySqlFields(String schemaName, SqlFieldsQuery qry, - @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, - GridQueryCancel cancel) { - boolean mvccEnabled = mvccEnabled(ctx), startTx = autoStartTx(qry); + @Override public List>> querySqlFields(TaskSqlFields t) { + final SqlFieldsQuery qry = t.qry(); - final long startTime = U.currentTimeMillis(); + boolean mvccEnabled = mvccEnabled(ctx); + t.addStartTx(autoStartTx(qry)); + Throwable error = null; List>> res = null; + int[] cacheIds = new int[0]; + try { - res = tryQueryDistributedSqlFieldsNative(schemaName, qry, cliCtx); + res = tryQueryDistributedSqlFieldsNative(t.schemaName(), qry, t.cliCtx()); if (res != null) return res; { // First, let's check if we already have a two-step query for this statement... - H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, qry.getSql(), + H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(t.schemaName(), qry.getSql(), qry.isCollocated(), qry.isDistributedJoins(), qry.isEnforceJoinOrder(), qry.isLocal()); H2TwoStepCachedQuery cachedQry; if ((cachedQry = twoStepCache.get(cachedQryKey)) != null) { - checkQueryType(qry, true); + checkQueryType(t.qry(), true); GridCacheTwoStepQuery twoStepQry = cachedQry.query().copy(); List meta = cachedQry.meta(); - res = Collections.singletonList(doRunDistributedQuery(schemaName, qry, twoStepQry, meta, keepBinary, - startTx, tracker, cancel)); + t.addTwoStepQry(twoStepQry).addMeta(meta); + if (!F.isEmpty(twoStepQry.cacheIds())) + t.addCacheIds(twoStepQry.cacheIds().stream().mapToInt(Integer::intValue).toArray()); + + res = Collections.singletonList(runQueryTwoStep(t)); + if (!twoStepQry.explain()) twoStepCache.putIfAbsent(cachedQryKey, new H2TwoStepCachedQuery(meta, twoStepQry.copy())); @@ -2151,13 +2216,14 @@ // Second, let's check if we already have a parsed statement... PreparedStatement cachedStmt; - if ((cachedStmt = cachedStatement(connectionForSchema(schemaName), qry.getSql())) != null) { + if ((cachedStmt = cachedStatement(connectionForSchema(t.schemaName()), qry.getSql())) != null) { Prepared prepared = GridSqlQueryParser.prepared(cachedStmt); // We may use this cached statement only for local queries and non queries. if (qry.isLocal() || !prepared.isQuery()) - return (List>>)doRunPrepared(schemaName, prepared, qry, null, null, - keepBinary, startTx, tracker, cancel); + return (List>>)doRunPrepared( + t.addPreparedCmd(prepared).addTwoStepQry(null).addMeta(null) + ); } } @@ -2168,8 +2234,11 @@ String remainingSql = qry.getSql(); while (remainingSql != null) { - ParsingResult parseRes = parseAndSplit(schemaName, - remainingSql != qry.getSql() ? cloneFieldsQuery(qry).setSql(remainingSql) : qry, firstArg); + ParsingResult parseRes = parseAndSplit( + t.schemaName(), + remainingSql != qry.getSql() ? cloneFieldsQuery(qry).setSql(remainingSql) : qry, + firstArg + ); // Let's avoid second reflection getter call by returning Prepared object too Prepared prepared = parseRes.prepared(); @@ -2182,13 +2251,20 @@ remainingSql = parseRes.remainingSql(); - if (remainingSql != null && failOnMultipleStmts) + if (remainingSql != null && t.failOnMultipleStmts()) throw new IgniteSQLException("Multiple statements queries are not supported"); + else + cacheIds = parseRes.cacheIds(); firstArg += prepared.getParameters().size(); - res.addAll(doRunPrepared(schemaName, prepared, newQry, twoStepQry, meta, keepBinary, startTx, tracker, - cancel)); + res.addAll(doRunPrepared(t. + addPreparedCmd(prepared) + .addTwoStepQry(twoStepQry) + .addMeta(meta) + .addSqlFieldsQuery(newQry) + .addCacheIds(cacheIds) + )); if (parseRes.twoStepQuery() != null && parseRes.twoStepQueryKey() != null && !parseRes.twoStepQuery().explain()) @@ -2204,65 +2280,34 @@ if (mvccEnabled && (tx = tx(ctx)) != null) tx.setRollbackOnly(); - error = e; - throw e; - } finally { - long duration = U.currentTimeMillis() - startTime; - - boolean failed = error != null; - - H2TwoStepCachedQuery query = twoStepCache.get(new H2TwoStepCachedQueryKey(schemaName, qry.getSql(), - qry.isCollocated(), qry.isDistributedJoins(), qry.isEnforceJoinOrder(), qry.isLocal())); - - if (query != null) - for (Integer cacheId: query.query().cacheIds()) { - GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); - - if (cctx != null) - cctx.queries().collectMetrics(SQL_FIELDS, qry.getSql(), startTime, duration, failed); - } + } - - if (log.isTraceEnabled()) - log.trace("Query execution [startTime=" + startTime + ", duration=" + duration + - ", fail=" + failed + ", res=" + res + ']'); - } + } - } /** * Execute an all-ready {@link SqlFieldsQuery}. - * @param schemaName Schema name. - * @param prepared H2 command. - * @param qry Fields query with flags. - * @param twoStepQry Two-step query if this query must be executed in a distributed way. - * @param meta Metadata for {@code twoStepQry}. - * @param keepBinary Whether binary objects must not be deserialized automatically. - * @param startTx Start transactionq flag. - * @param tracker MVCC tracker. - * @param cancel Query cancel state holder. + * + * @param t Parameter holder. * @return Query result. */ @SuppressWarnings("unchecked") - private List>> doRunPrepared(String schemaName, Prepared prepared, - SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List meta, boolean keepBinary, - boolean startTx, MvccQueryTracker tracker, GridQueryCancel cancel) { + private List>> doRunPrepared(TaskSqlFields t) { + final String sqlQry = t.qry().getSql(); - String sqlQry = qry.getSql(); + final boolean loc = t.qry().isLocal(); - boolean loc = qry.isLocal(); + IndexingQueryFilter filter = (loc ? backupFilter(null, t.qry().getPartitions()) : null); - IndexingQueryFilter filter = (loc ? backupFilter(null, qry.getPartitions()) : null); - - if (!prepared.isQuery()) { - if (DmlStatementsProcessor.isDmlStatement(prepared)) { + if (!t.preparedCommand().isQuery()) { + if (DmlStatementsProcessor.isDmlStatement(t.preparedCommand())) { try { - Connection conn = connectionForSchema(schemaName); + Connection conn = connectionForSchema(t.schemaName()); if (!loc) - return dmlProc.updateSqlFieldsDistributed(schemaName, conn, prepared, qry, cancel); + return dmlProc.updateSqlFieldsDistributed(t.schemaName(), conn, t.preparedCommand(), t.qry(), t.cancel()); else { final GridQueryFieldsResult updRes = - dmlProc.updateSqlFieldsLocal(schemaName, conn, prepared, qry, filter, cancel); + dmlProc.updateSqlFieldsLocal(t.schemaName(), conn, t.preparedCommand(), t.qry(), filter, t.cancel()); return Collections.singletonList(new QueryCursorImpl<>(new Iterable>() { @SuppressWarnings("NullableProblems") @@ -2275,24 +2320,24 @@ throw new IgniteException(e); } } - }, cancel)); + }, t.cancel())); } } catch (IgniteCheckedException e) { throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry + - ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); + ", params=" + Arrays.deepToString(t.qry().getArgs()) + "]", e); } } - if (DdlStatementsProcessor.isDdlStatement(prepared)) { + if (DdlStatementsProcessor.isDdlStatement(t.preparedCommand())) { if (loc) throw new IgniteSQLException("DDL statements are not supported for LOCAL caches", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - return Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared)); + return Collections.singletonList(ddlProc.runDdlStatement(sqlQry, t.preparedCommand())); } - if (prepared instanceof NoOperation) { + if (t.preparedCommand() instanceof NoOperation) { QueryCursorImpl> resCur = (QueryCursorImpl>)new QueryCursorImpl( Collections.singletonList(Collections.singletonList(0L)), null, false); @@ -2301,27 +2346,27 @@ return Collections.singletonList(resCur); } - throw new IgniteSQLException("Unsupported DDL/DML operation: " + prepared.getClass().getName(), + throw new IgniteSQLException("Unsupported DDL/DML operation: " + t.preparedCommand().getClass().getName(), IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } - if (twoStepQry != null) { + if (t.twoStepQuery() != null) { if (log.isDebugEnabled()) - log.debug("Parsed query: `" + sqlQry + "` into two step query: " + twoStepQry); + log.debug("Parsed query: `" + sqlQry + "` into two step query: " + t.twoStepQuery()); - checkQueryType(qry, true); + checkQueryType(t.qry(), true); - return Collections.singletonList(doRunDistributedQuery(schemaName, qry, twoStepQry, meta, keepBinary, - startTx, tracker, cancel)); + return Collections.singletonList(runQueryTwoStep(t)); } // We've encountered a local query, let's just run it. try { - return Collections.singletonList(queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel)); + t.addFilter(filter); + return Collections.singletonList(queryLocalSqlFields(t)); } catch (IgniteCheckedException e) { throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry + - ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); + ", params=" + Arrays.deepToString(t.qry().getArgs()) + "]", e); } } @@ -2375,7 +2420,9 @@ args = Arrays.copyOfRange(argsOrig, firstArg, firstArg + paramsCnt); } + int[] cacheIds = new int[0]; + - if (prepared.isQuery()) { + if (prepared.isQuery()) { try { bindParameters(stmt, F.asList(args)); } @@ -2386,13 +2433,11 @@ Arrays.deepToString(args) + "]", IgniteQueryErrorCode.PARSING, e); } - GridSqlQueryParser parser = null; + GridSqlQueryParser parser = new GridSqlQueryParser(false); - if (!loc) { - parser = new GridSqlQueryParser(false); - - GridSqlStatement parsedStmt = parser.parse(prepared); + GridSqlStatement parsedStmt = parser.parse(prepared); + if (!loc) { // Legit assertion - we have H2 query flag above. assert parsedStmt instanceof GridSqlQuery; @@ -2400,12 +2445,6 @@ } if (loc) { - if (parser == null) { - parser = new GridSqlQueryParser(false); - - parser.parse(prepared); - } - GridCacheContext cctx = parser.getFirstPartitionedCache(); if (cctx != null && cctx.config().getQueryParallelism() > 1) { @@ -2414,6 +2453,8 @@ qry.setDistributedJoins(true); } } + + cacheIds = parser.getCacheIdsOfTables(); } SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(prepared.getSQL()).setArgs(args); @@ -2425,7 +2466,15 @@ getStatementsCacheForCurrentThread().remove(schemaName, qry.getSql()); if (!hasTwoStep) - return new ParsingResult(prepared, newQry, remainingSql, null, null, null); + return new ParsingResult( + cacheIds, + prepared, + newQry, + remainingSql, + null, + null, + null + ); final UUID locNodeId = ctx.localNodeId(); @@ -2443,7 +2492,15 @@ List meta = cachedQry.meta(); - return new ParsingResult(prepared, newQry, remainingSql, twoStepQry, cachedQryKey, meta); + return new ParsingResult( + cacheIds, + prepared, + newQry, + remainingSql, + twoStepQry, + cachedQryKey, + meta + ); } try { @@ -2453,8 +2510,15 @@ try { GridCacheTwoStepQuery twoStepQry = split(prepared, newQry); - return new ParsingResult(prepared, newQry, remainingSql, twoStepQry, - cachedQryKey, H2Utils.meta(stmt.getMetaData())); + return new ParsingResult( + cacheIds, + prepared, + newQry, + remainingSql, + twoStepQry, + cachedQryKey, + H2Utils.meta(stmt.getMetaData()) + ); } catch (IgniteCheckedException e) { throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" + @@ -2571,50 +2635,6 @@ } /** - * Run distributed query on detected set of partitions. - * @param schemaName Schema name. - * @param qry Original query. - * @param twoStepQry Two-step query. - * @param meta Metadata to set to cursor. - * @param keepBinary Keep binary flag. - * @param startTx Start transaction flag. - * @param mvccTracker Query tracker. - * @param cancel Cancel handler. - * @return Cursor representing distributed query result. - */ - private FieldsQueryCursor> doRunDistributedQuery(String schemaName, SqlFieldsQuery qry, - GridCacheTwoStepQuery twoStepQry, List meta, boolean keepBinary, - boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel) { - if (log.isDebugEnabled()) - log.debug("Parsed query: `" + qry.getSql() + "` into two step query: " + twoStepQry); - - twoStepQry.pageSize(qry.getPageSize()); - - if (cancel == null) - cancel = new GridQueryCancel(); - - int partitions[] = qry.getPartitions(); - - if (partitions == null && twoStepQry.derivedPartitions() != null) { - try { - partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), qry.getArgs()); - } - catch (IgniteCheckedException e) { - throw new CacheException("Failed to calculate derived partitions: [qry=" + qry.getSql() + ", params=" + - Arrays.deepToString(qry.getArgs()) + "]", e); - } - } - - QueryCursorImpl> cursor = new QueryCursorImpl<>( - runQueryTwoStep(schemaName, twoStepQry, keepBinary, qry.isEnforceJoinOrder(), startTx, qry.getTimeout(), - cancel, qry.getArgs(), partitions, qry.isLazy(), mvccTracker), cancel); - - cursor.fieldsMeta(meta); - - return cursor; - } - - /** * Do initial parsing of the statement and create query caches, if needed. * @param c Connection. * @param sqlQry Query. @@ -2629,10 +2649,10 @@ } catch (SQLException e) { if (!cachesCreated && ( - e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1 || - e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1 || - e.getErrorCode() == ErrorCode.INDEX_NOT_FOUND_1) - ) { + e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1 || + e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1 || + e.getErrorCode() == ErrorCode.INDEX_NOT_FOUND_1) + ) { try { ctx.cache().createMissingQueryCaches(); } @@ -3655,7 +3675,7 @@ ArrayList list = new ArrayList<>(partInfoList.length); - for (CacheQueryPartitionInfo partInfo: partInfoList) { + for (CacheQueryPartitionInfo partInfo : partInfoList) { int partId = (partInfo.partition() >= 0) ? partInfo.partition() : bindPartitionInfoParameter(partInfo, params); @@ -3696,7 +3716,7 @@ GridH2RowDescriptor desc = dataTable(schema(partInfo.cacheName()), partInfo.tableName()).rowDescriptor(); Object param = H2Utils.convert(params[partInfo.paramIdx()], - desc, partInfo.dataType()); + desc, partInfo.dataType()); return kernalContext().affinity().partition(partInfo.cacheName(), param); } Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -27,6 +27,9 @@ * Result of parsing and splitting SQL from {@link SqlFieldsQuery}. */ final class ParsingResult { + /** Cache Ids*/ + private final int[] cacheIds; + /** H2 command. */ private final Prepared prepared; @@ -47,8 +50,16 @@ private final List meta; /** Simple constructor. */ - ParsingResult(Prepared prepared, SqlFieldsQuery newQry, String remainingSql, GridCacheTwoStepQuery twoStepQry, - H2TwoStepCachedQueryKey twoStepQryKey, List meta) { + ParsingResult( + int[] cacheIds, + Prepared prepared, + SqlFieldsQuery newQry, + String remainingSql, + GridCacheTwoStepQuery twoStepQry, + H2TwoStepCachedQueryKey twoStepQryKey, + List meta + ) { + this.cacheIds = cacheIds; this.prepared = prepared; this.newQry = newQry; this.remainingSql = remainingSql; @@ -58,6 +69,14 @@ } /** + * + * @return Involved cache ids + */ + int[] cacheIds(){ + return cacheIds; + } + + /** * @return Metadata for two-step query, or {@code} null if this result is for local query. */ List meta() { Index: modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java =================================================================== --- modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -1755,6 +1755,23 @@ return null; } + public int[] getCacheIdsOfTables(){ + List res = new ArrayList<>(); + + for (Object o : h2ObjToGridObj.values()) { + if (o instanceof GridSqlAlias) + o = GridSqlAlias.unwrap((GridSqlAst)o); + + if (o instanceof GridSqlTable) { + GridH2Table tbl = ((GridSqlTable)o).dataTable(); + + if (tbl != null) + res.add(tbl.cacheId()); + } + } + return res.stream().mapToInt(Integer::intValue).toArray(); + } + /** * @param stmt Prepared statement. * @return Parsed AST. Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryDetailMetricsSelfTest.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryDetailMetricsSelfTest.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryDetailMetricsSelfTest.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -133,19 +133,6 @@ } /** - * Test metrics for failed SQL queries. - * - * @throws Exception In case of error. - */ - public void testSqlFieldsQueryFailedMetrics() throws Exception { - IgniteCache cache = grid(0).context().cache().jcache("A"); - - SqlFieldsQuery qry = new SqlFieldsQuery("select * from UNKNOWN"); - - checkQueryFailedMetrics(cache, qry); - } - - /** * Test metrics eviction. * * @throws Exception In case of error. @@ -407,11 +394,13 @@ * @throws Exception In case of error. */ public void testSqlFieldsCrossCacheQueryMetrics() throws Exception { - IgniteCache cache = grid(0).context().cache().jcache("A"); + IgniteCache cacheA = grid(0).context().cache().jcache("A"); + IgniteCache cacheB = grid(0).context().cache().jcache("B"); + SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".String"); - checkQueryMetrics(cache, qry); + checkCrossCacheQueryMetrics(cacheA, cacheB, qry); } /** @@ -420,28 +409,17 @@ * @throws Exception In case of error. */ public void testSqlFieldsCrossCacheQueryNotFullyFetchedMetrics() throws Exception { - IgniteCache cache = grid(0).context().cache().jcache("A"); + IgniteCache cacheA = grid(0).context().cache().jcache("A"); + IgniteCache cacheB = grid(0).context().cache().jcache("B"); + SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".String"); qry.setPageSize(10); - checkQueryNotFullyFetchedMetrics(cache, qry, false); + checkCrossCacheQueryMetrics(cacheA, cacheB, qry); } /** - * Test metrics for failed SQL cross cache queries. - * - * @throws Exception In case of error. - */ - public void testSqlFieldsCrossCacheQueryFailedMetrics() throws Exception { - IgniteCache cache = grid(0).context().cache().jcache("A"); - - SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".String"); - - checkQueryFailedMetrics(cache, qry); - } - - /** * Check metrics. * * @param cache Cache to check metrics. @@ -452,7 +430,7 @@ * @param failures Expected number of failures. * @param first {@code true} if metrics checked for first query only. */ - private void checkMetrics(IgniteCache cache, int sz, int idx, int execs, + private void checkMetrics(IgniteCache cache, int sz, int idx, int execs, int completions, int failures, boolean first) { Collection metrics = cache.queryDetailMetrics(); @@ -490,6 +468,18 @@ checkMetrics(cache, 1, 0, 2, 2, 0, false); } + private void checkCrossCacheQueryMetrics(IgniteCache cacheToRun, IgniteCache cacheToCheck, Query qry) { + // Execute query. + cacheToRun.query(qry).getAll(); + + checkMetrics(cacheToCheck, 1, 0, 1, 1, 0, true); + + // Execute again with the same parameters. + cacheToRun.query(qry).getAll(); + + checkMetrics(cacheToCheck, 1, 0, 2, 2, 0, false); + } + /** * @param cache Cache. * @param qry Query. Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -129,19 +129,6 @@ } /** - * Test metrics for failed SQL queries. - * - * @throws Exception In case of error. - */ - public void testSqlFieldsQueryFailedMetrics() throws Exception { - IgniteCache cache = grid(0).context().cache().jcache("A"); - - SqlFieldsQuery qry = new SqlFieldsQuery("select * from UNKNOWN"); - - checkQueryFailedMetrics(cache, qry); - } - - /** * Test metrics for Scan queries. * * @throws Exception In case of error. @@ -271,7 +258,9 @@ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".String"); - checkQueryMetrics(cache, qry); + IgniteCache cache2 = grid(0).context().cache().jcache("B"); + + checkCrossQueryMetrics(cache, cache2, qry); } /** @@ -285,20 +274,9 @@ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".String"); qry.setPageSize(10); - checkQueryNotFullyFetchedMetrics(cache, qry, false); - } + IgniteCache cache2 = grid(0).context().cache().jcache("B"); - /** - * Test metrics for failed SQL cross cache queries. - * - * @throws Exception In case of error. - */ - public void testSqlFieldsCrossCacheQueryFailedMetrics() throws Exception { - IgniteCache cache = grid(0).context().cache().jcache("A"); - - SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".String"); - - checkQueryFailedMetrics(cache, qry); + checkCrossQueryNotFullyFetchedMetrics(cache, cache2, qry, false); } /** */ @@ -358,7 +336,7 @@ * @param failures Expected number of failures. * @param first {@code true} if metrics checked for first query only. */ - private void checkMetrics(IgniteCache cache, int execs, int completions, int failures, boolean first) { + private void checkMetrics(IgniteCache cache, int execs, int completions, int failures, boolean first) { GridCacheQueryMetricsAdapter m = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); assertNotNull(m); @@ -380,7 +358,7 @@ * @param cache Cache. * @param qry Query. */ - private void checkQueryMetrics(IgniteCache cache, Query qry) { + private void checkQueryMetrics(IgniteCache cache, Query qry) { cache.query(qry).getAll(); checkMetrics(cache, 1, 1, 0, true); @@ -392,6 +370,22 @@ } /** + * @param cacheToRun Cache. + * @param cacheToTest Cache. + * @param qry Query. + */ + private void checkCrossQueryMetrics(IgniteCache cacheToRun, IgniteCache cacheToTest, Query qry) { + cacheToRun.query(qry).getAll(); + + checkMetrics(cacheToTest, 1, 1, 0, true); + + // Execute again with the same parameters. + cacheToRun.query(qry).getAll(); + + checkMetrics(cacheToTest, 2, 2, 0, false); + } + + /** * @param cache Cache. * @param qry Query. * @param waitingForCompletion Waiting for query completion. @@ -415,6 +409,30 @@ } /** + * @param cacheToRun Cache. + * @param cacheToCheck Cache. + * @param qry Query. + * @param waitingForCompletion Waiting for query completion. + */ + private void checkCrossQueryNotFullyFetchedMetrics(IgniteCache cacheToRun, IgniteCache cacheToCheck, Query qry, + boolean waitingForCompletion) throws IgniteInterruptedCheckedException { + cacheToRun.query(qry).iterator().next(); + + if (waitingForCompletion) + waitingForCompletion(cacheToRun, 1); + + checkMetrics(cacheToCheck, 1, 1, 0, true); + + // Execute again with the same parameters. + cacheToRun.query(qry).iterator().next(); + + if (waitingForCompletion) + waitingForCompletion(cacheToRun, 2); + + checkMetrics(cacheToCheck, 2, 2, 0, false); + } + + /** * @param cache Cache. * @param qry Query. */ @@ -443,7 +461,7 @@ * @param cache Cache. * @param exp Expected. */ - private static void waitingForCompletion(final IgniteCache cache, + private static void waitingForCompletion(final IgniteCache cache, final int exp) throws IgniteInterruptedCheckedException { GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metrics/CacheAbstractQueryDetailMetricsSelfTest.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metrics/CacheAbstractQueryDetailMetricsSelfTest.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metrics/CacheAbstractQueryDetailMetricsSelfTest.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -19,9 +19,12 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.List; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.QueryDetailMetrics; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -44,8 +47,11 @@ */ public abstract class CacheAbstractQueryDetailMetricsSelfTest extends GridCommonAbstractTest { /** */ - private static final int QRY_DETAIL_METRICS_SIZE = 3; + static final int QRY_DETAIL_METRICS_SIZE = 3; + /** */ + public static final int CACHED_VALUES = 100; + /** Grid count. */ protected int gridCnt; @@ -62,7 +68,7 @@ IgniteCache cacheA = grid(0).cache("A"); IgniteCache cacheB = grid(0).cache("B"); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < CACHED_VALUES; i++) { cacheA.put(i, String.valueOf(i)); cacheB.put(i, Integer.valueOf(i).longValue()); } @@ -94,7 +100,7 @@ * @param cacheName Cache name. * @return Cache configuration. */ - private CacheConfiguration configureCache2(String cacheName) { + protected CacheConfiguration configureCache2(String cacheName) { CacheConfiguration ccfg = defaultCacheConfiguration(); ccfg.setName(cacheName); @@ -103,6 +109,7 @@ ccfg.setIndexedTypes(Integer.class, Long.class); ccfg.setStatisticsEnabled(true); ccfg.setQueryDetailMetricsSize(QRY_DETAIL_METRICS_SIZE); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); return ccfg; } @@ -133,7 +140,7 @@ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Long"); - checkQueryMetrics(cache, cache2, qry); + checkDeferredQueryMetrics(cache, cache2, qry, CACHED_VALUES); } /** @@ -146,25 +153,13 @@ IgniteCache cache2 = grid(0).context().cache().jcache("B"); SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Long"); + qry.setPageSize(10); checkQueryNotFullyFetchedMetrics(cache, cache2, false, qry); } /** - * Test metrics for failed SQL queries. - * - * @throws Exception In case of error. - */ - public void testSqlFieldsQueryFailedMetrics() throws Exception { - IgniteCache cache = grid(0).context().cache().jcache("A"); - - SqlFieldsQuery qry = new SqlFieldsQuery("select * from UNKNOWN"); - - checkQueryFailedMetrics(cache, qry); - } - - /** * Test metrics eviction. * * @throws Exception In case of error. @@ -201,7 +196,7 @@ String lastMetrics = ""; for (QueryDetailMetrics m : metrics) - lastMetrics += m.queryType() + " " + m.query() + ";"; + lastMetrics += m.queryType() + " " + m.query() + ";"; assertTrue(lastMetrics.contains("SQL_FIELDS select * from String limit 2;")); assertTrue(lastMetrics.contains("SCAN A;")); @@ -209,12 +204,12 @@ cache = grid(0).context().cache().jcache("B"); - cache.query(new SqlFieldsQuery("select * from String")).getAll(); - cache.query(new SqlFieldsQuery("select count(*) from String")).getAll(); - cache.query(new SqlFieldsQuery("select * from String limit 1")).getAll(); - cache.query(new SqlFieldsQuery("select * from String limit 2")).getAll(); + cache.query(new SqlFieldsQuery("select * from Long")).getAll(); + cache.query(new SqlFieldsQuery("select count(*) from Long")).getAll(); + cache.query(new SqlFieldsQuery("select * from Long limit 1")).getAll(); + cache.query(new SqlFieldsQuery("select * from Long limit 2")).getAll(); cache.query(new ScanQuery()).getAll(); - cache.query(new SqlQuery("String", "from String")).getAll(); + cache.query(new SqlQuery("Long", "from Long")).getAll(); waitingFor(cache, "size", QRY_DETAIL_METRICS_SIZE); @@ -238,12 +233,12 @@ cache = grid(1).context().cache().jcache("B"); - cache.query(new SqlFieldsQuery("select * from String")).getAll(); - cache.query(new SqlFieldsQuery("select count(*) from String")).getAll(); - cache.query(new SqlFieldsQuery("select * from String limit 1")).getAll(); - cache.query(new SqlFieldsQuery("select * from String limit 2")).getAll(); + cache.query(new SqlFieldsQuery("select * from Long")).getAll(); + cache.query(new SqlFieldsQuery("select count(*) from Long")).getAll(); + cache.query(new SqlFieldsQuery("select * from Long limit 1")).getAll(); + cache.query(new SqlFieldsQuery("select * from Long limit 2")).getAll(); cache.query(new ScanQuery()).getAll(); - cache.query(new SqlQuery("String", "from String")).getAll(); + cache.query(new SqlQuery(Long.class, "from Long")).getAll(); waitingFor(cache, "size", QRY_DETAIL_METRICS_SIZE); @@ -253,12 +248,31 @@ } /** */ + public void testInsertMetrics() throws Exception { + IgniteCache cache = grid(0).context().cache().jcache("B"); + + SqlFieldsQuery qry = new SqlFieldsQuery( + "INSERT INTO LONG (_key, _val) values (-101,-101),(-102,-102),(-103,-103)"); + + checkQueryMetrics(cache, qry, 1); + } + + /** */ + public void testDeleteMetrics() throws Exception { + IgniteCache cache = grid(0).context().cache().jcache("B"); + + SqlFieldsQuery qry = new SqlFieldsQuery("DELETE FROM LONG Where _val='5'"); + + checkQueryMetrics(cache, qry, 1); + } + + /** */ private static class Worker extends Thread { /** */ private final IgniteCache cache; /** */ - private final Query qry; + private final Query qry; /** */ Worker(IgniteCache cache, Query qry) { @@ -310,7 +324,7 @@ SqlQuery qry = new SqlQuery<>("String", "from String"); - checkQueryMetrics(cache, qry); + checkDoubleQueryMetrics(cache, qry, CACHED_VALUES); } /** @@ -322,10 +336,12 @@ IgniteCache cache = grid(0).context().cache().jcache("A"); IgniteCache cache2 = grid(0).context().cache().jcache("B"); - SqlQuery qry = new SqlQuery<>("String", "from String"); + SqlQuery qry = new SqlQuery<>(Long.class, "Select * from \"B\".Long"); + qry.setPageSize(10); - checkQueryNotFullyFetchedMetrics(cache, cache2, true, qry); + //todo: Enable after fix of IGNITE-9771 + //checkQueryNotFullyFetchedMetrics(cache, cache2, true, qry); } /** @@ -351,7 +367,7 @@ TextQuery qry = new TextQuery<>("String", "1"); - checkQueryMetrics(cache, qry); + checkDoubleQueryMetrics(cache, qry, 1); } /** @@ -365,7 +381,7 @@ SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".Long"); - checkQueryMetrics(cache, cache2, qry); + checkDeferredQueryMetrics(cache, cache2, qry, CACHED_VALUES); } /** @@ -384,19 +400,6 @@ } /** - * Test metrics for failed SQL cross cache queries. - * - * @throws Exception In case of error. - */ - public void testSqlFieldsCrossCacheQueryFailedMetrics() throws Exception { - IgniteCache cache = grid(0).context().cache().jcache("A"); - - SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"G\".Long"); - - checkQueryFailedMetrics(cache, qry); - } - - /** * Check metrics. * * @param cache Cache to check metrics. @@ -407,58 +410,96 @@ * @param failures Expected number of failures. * @param first {@code true} if metrics checked for first query only. */ - private void checkMetrics(IgniteCache cache, int sz, int idx, int execs, - int completions, int failures, boolean first) { + private void checkMetrics(IgniteCache cache, + int sz, + int idx, + int execs, + int completions, + int failures, + boolean first) { Collection metrics = cache.queryDetailMetrics(); assertNotNull(metrics); assertEquals(sz, metrics.size()); + if (sz > 0) { - QueryDetailMetrics m = new ArrayList<>(metrics).get(idx); + QueryDetailMetrics m = new ArrayList<>(metrics).get(idx); - info("Metrics: " + m); + info("Metrics: " + m); - assertEquals("Executions", execs, m.executions()); - assertEquals("Completions", completions, m.completions()); - assertEquals("Failures", failures, m.failures()); - assertTrue(m.averageTime() >= 0); - assertTrue(m.maximumTime() >= 0); - assertTrue(m.minimumTime() >= 0); + assertEquals("Executions", execs, m.executions()); + assertEquals("Completions", completions, m.completions()); + assertEquals("Failures", failures, m.failures()); + assertTrue(m.averageTime() >= 0); + assertTrue(m.maximumTime() >= 0); + assertTrue(m.minimumTime() >= 0); - if (first) - assertTrue("On first execution minTime == maxTime", m.minimumTime() == m.maximumTime()); - } + if (first) + assertTrue("On first execution minTime == maxTime", m.minimumTime() == m.maximumTime()); + } + } /** * @param cache Cache. * @param qry Query. */ - private void checkQueryMetrics(IgniteCache cache, Query qry) { + private void checkDoubleQueryMetrics(IgniteCache cache, Query qry, int expResSize) { // Execute query. - cache.query(qry).getAll(); + checkQueryMetrics(cache, qry, expResSize, 1); - checkMetrics(cache, 1, 0, 1, 1, 0, true); - // Execute again with the same parameters. - cache.query(qry).getAll(); + checkQueryMetrics(cache, qry, expResSize, 2); + } - checkMetrics(cache, 1, 0, 2, 2, 0, false); + /** + * @param cache Cache. + * @param qry Query. + */ + private void checkQueryMetrics(IgniteCache cache, Query qry, int expResSize) { + checkQueryMetrics(cache, qry, expResSize, 1); } /** * @param cache Cache. + * @param qry Query. + */ + private void checkQueryMetrics(IgniteCache cache, Query qry, int expResSize, int number) { + // Execute query. + QueryCursor cur = cache.query(qry); + + List res1 = cur.getAll(); + + assertEquals(expResSize, res1.size()); + + checkMetrics(cache, 1, 0, number, number, 0, number<=1); + } + + /** + * @param cache Cache. * @param cache2 Cache. * @param qry Query. */ - private void checkQueryMetrics(IgniteCache cache, IgniteCache cache2, Query qry) { + private void checkDeferredQueryMetrics(IgniteCache cache, IgniteCache cache2, Query qry, int expResSize) { // Execute query. - cache.query(qry).getAll(); + QueryCursor cur = cache.query(qry); + checkMetrics(cache, 0, 0, 0, 0, 0, true); + + checkMetrics(cache2, 0, 0, 0, 0, 0, true); + + List res1 = cur.getAll(); + + assertEquals(expResSize, res1.size()); + + checkMetrics(cache, 0, 0, 0, 0, 0, true); + checkMetrics(cache2, 1, 0, 1, 1, 0, true); // Execute again with the same parameters. - cache.query(qry).getAll(); + List res2 = cache.query(qry).getAll(); + assertEquals(expResSize, res2.size()); + checkMetrics(cache2, 1, 0, 2, 2, 0, false); } @@ -468,8 +509,8 @@ * @param waitingForCompletion Waiting for query completion. * @param qry Query. */ - private void checkQueryNotFullyFetchedMetrics(IgniteCache cache, - IgniteCache cache2, boolean waitingForCompletion, Query qry) throws IgniteInterruptedCheckedException { + private void checkQueryNotFullyFetchedMetrics(IgniteCache cache, + IgniteCache cache2, boolean waitingForCompletion, Query qry) throws IgniteInterruptedCheckedException { // Execute query. cache.query(qry).iterator().next(); @@ -518,7 +559,7 @@ * @param cond Condition to check. * @param exp Expected value. */ - private static void waitingFor(final IgniteCache cache, + private static void waitingFor(final IgniteCache cache, final String cond, final int exp) throws IgniteInterruptedCheckedException { GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metrics/CacheLocalQueryDetailMetricsSelfTest.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metrics/CacheLocalQueryDetailMetricsSelfTest.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metrics/CacheLocalQueryDetailMetricsSelfTest.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -17,7 +17,10 @@ package org.apache.ignite.internal.processors.cache.metrics; +import org.apache.ignite.configuration.CacheConfiguration; + import static org.apache.ignite.cache.CacheMode.LOCAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** * Tests for local cache query metrics. @@ -30,4 +33,21 @@ super.beforeTest(); } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + */ + protected CacheConfiguration configureCache2(String cacheName) { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setName(cacheName); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setIndexedTypes(Integer.class, Long.class); + ccfg.setStatisticsEnabled(true); + ccfg.setQueryDetailMetricsSize(QRY_DETAIL_METRICS_SIZE); + + return ccfg; -} + } +} Index: modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -173,9 +173,9 @@ */ private BinaryObjectBuilder aa(String typeName, long id, String name, int age) { BinaryObjectBuilder aBuilder = ignite0.binary().builder(typeName) - .setField("id", id) - .setField("name", name) - .setField("age", age); + .setField("id", id) + .setField("name", name) + .setField("age", age); return aBuilder; } @@ -342,11 +342,21 @@ // Fields query GridQueryFieldsResult fieldsRes = - spi.queryLocalSqlFields(spi.schema("A"), "select a.a.name n1, a.a.age a1, b.a.name n2, " + - "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, false, 0, null); + spi.queryLocalSqlFields( + collectCacheIds(spi, "A"), + spi.schema("A"), + "select a.a.name n1, a.a.age a1, b.a.name n2, b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", + Collections.emptySet(), + null, + false, + false, + 0, + null, + null + ); String[] aliases = {"N1", "A1", "N2", "A2"}; - Object[] vals = { "Valera", 19, "Kolya", 25}; + Object[] vals = {"Valera", 19, "Kolya", 25}; IgniteSpiCloseableIterator> it = fieldsRes.iterator(); @@ -400,8 +410,18 @@ time = now; range *= 3; - GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.asList(1, - range), null, false, false, 0, null); + GridQueryFieldsResult res = spi.queryLocalSqlFields( + collectCacheIds(spi, "A"), + spi.schema("A"), + sql, + Arrays.asList(1, range), + null, + false, + false, + 0, + null, + null + ); assert res.iterator().hasNext(); @@ -417,6 +437,13 @@ } } + private int[] collectCacheIds(IgniteH2Indexing spi, String a) { + return spi.tables(a).stream() + .map(desc -> desc.table().cacheId()) + .mapToInt(Integer::intValue) + .toArray(); + } + /** * Index descriptor. */ @@ -483,7 +510,8 @@ * @param valFields Fields. * @param textIdx Fulltext index. */ - private TypeDesc(String cacheName, String schemaName, String name, Map> valFields, GridQueryIndexDescriptor textIdx) { + private TypeDesc(String cacheName, String schemaName, String name, Map> valFields, + GridQueryIndexDescriptor textIdx) { this.name = name; this.cacheName = cacheName; this.schemaName = schemaName; @@ -595,7 +623,8 @@ /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void setValue(String field, Object key, Object val, Object propVal) throws IgniteCheckedException { + @Override public void setValue(String field, Object key, Object val, Object propVal + ) throws IgniteCheckedException { assert !F.isEmpty(field); assert key instanceof Integer; @@ -759,7 +788,8 @@ } /** {@inheritDoc} */ - @Override public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { + @Override public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr + ) throws IgniteCheckedException { throw new UnsupportedOperationException(); } Index: modules/indexing/src/test/java/org/apache/ignite/sqltests/PartitionedSqlTest.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/sqltests/PartitionedSqlTest.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/test/java/org/apache/ignite/sqltests/PartitionedSqlTest.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -50,7 +50,7 @@ assertContainsEq("Distributed join on 'idx = idx' returned unexpected result. " + "Preserve join order = " + forceOrder + ".", actIdxOnOn.values(), exp); assertContainsEq("Distributed join on 'noidx = idx' returned unexpected result. " + - "Preserve join order = " + forceOrder + ".", actIdxOffOn.values(), exp); + "Preserve join order = " + true + ".", actIdxOffOn.values(), exp); })); } Index: modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java =================================================================== --- modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java (revision 1f82d59126f96d62ae02794e6cb658c114c2bf21) +++ modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java (revision 4386ea363b1735ec4cf177b6b610a7cb39e613f3) @@ -410,6 +410,13 @@ suite.addTestSuite(CacheReplicatedQueryDetailMetricsDistributedSelfTest.class); suite.addTestSuite(CacheReplicatedQueryDetailMetricsLocalSelfTest.class); + // Cache query metrics. + suite.addTestSuite(org.apache.ignite.internal.processors.cache.metrics.CacheLocalQueryDetailMetricsSelfTest.class); + suite.addTestSuite(org.apache.ignite.internal.processors.cache.metrics.CachePartitionedQueryDetailMetricsDistributedSelfTest.class); + suite.addTestSuite(org.apache.ignite.internal.processors.cache.metrics.CachePartitionedQueryDetailMetricsLocalSelfTest.class); + suite.addTestSuite(org.apache.ignite.internal.processors.cache.metrics.CacheReplicatedQueryDetailMetricsDistributedSelfTest.class); + suite.addTestSuite(org.apache.ignite.internal.processors.cache.metrics.CacheReplicatedQueryDetailMetricsLocalSelfTest.class); + // Unmarshalling query test. suite.addTestSuite(IgniteCacheP2pUnmarshallingQueryErrorTest.class); suite.addTestSuite(IgniteCacheNoClassQuerySelfTest.class);